Index: lsm-test/lsmtest.h ================================================================== --- lsm-test/lsmtest.h +++ lsm-test/lsmtest.h @@ -76,10 +76,11 @@ ** includes code for a couple of different lsm configurations, and for ** various types of fault injection and robustness testing. */ int test_lsm_open(const char *zFilename, int bClear, TestDb **ppDb); int test_lsm_lomem_open(const char *zFilename, int bClear, TestDb **ppDb); +int test_lsm_zip_open(const char *zFilename, int bClear, TestDb **ppDb); int test_lsm_small_open(const char *zFilename, int bClear, TestDb **ppDb); int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb); int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb); /* Functions in testutil.c. */ Index: lsm-test/lsmtest2.c ================================================================== --- lsm-test/lsmtest2.c +++ lsm-test/lsmtest2.c @@ -202,21 +202,22 @@ ** Open the LSM database identified by zFile and compute its checksum ** (a string, as returned by testCksumDatabase()). If the checksum is ** identical to zExpect1 or, if it is not NULL, zExpect2, the test passes. ** Otherwise, print an error message and set *pRc to 1. */ -void testCompareCksumLsmdb( +static void testCompareCksumLsmdb( const char *zFile, /* Path to LSM database */ + int bCompress, /* True if db is compressed */ const char *zExpect1, /* Expected checksum 1 */ const char *zExpect2, /* Expected checksum 2 (or NULL) */ int *pRc /* IN/OUT: Test case error code */ ){ if( *pRc==0 ){ char zCksum[TEST_CKSUM_BYTES]; TestDb *pDb; - pDb = testOpen("lsm", 0, pRc); + *pRc = tdb_lsm_open((bCompress?"compression=1 mmap=0":""), zFile, 0, &pDb); testCksumDatabase(pDb, zCksum); testClose(&pDb); if( *pRc==0 ){ int r1 = 0; @@ -247,13 +248,12 @@ /* ** This test verifies that if a system crash occurs while doing merge work ** on the db, no data is lost. */ -static void crash_test1(int *pRc){ +static void crash_test1(int bCompress, int *pRc){ const char *DBNAME = "testdb.lsm"; - const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 200, 200}; const int nRow = 5000; /* Database size */ const int nIter = 200; /* Number of test iterations */ const int nWork = 20; /* Maximum lsm_work() calls per iteration */ @@ -262,30 +262,38 @@ int i; int iDot = 0; Datasource *pData; CksumDb *pCksumDb; TestDb *pDb; + char *zCfg; + + const char *azConfig[2] = { + "page_size=1024 block_size=65536 write_buffer=16384 safety=2 mmap=0", + "page_size=1024 block_size=65536 write_buffer=16384 safety=2 " + " compression=1 mmap=0" + }; + assert( bCompress==0 || bCompress==1 ); /* Allocate datasource. And calculate the expected checksums. */ pData = testDatasourceNew(&defn); pCksumDb = testCksumArrayNew(pData, nRow, nRow, 1); /* Setup and save the initial database. */ - testSetupSavedLsmdb( - "page_size=1024 block_size=65536 write_buffer=16384 nmerge=7", - DBNAME, pData, 5000, pRc - ); + + zCfg = testMallocPrintf("%s nmerge=7", azConfig[bCompress]); + testSetupSavedLsmdb(zCfg, DBNAME, pData, 5000, pRc); + testFree(zCfg); for(i=0; izTest) ){ - p->x(pRc); + p->x(p->bCompress, pRc); testCaseFinish(*pRc); } } } Index: lsm-test/lsmtest_main.c ================================================================== --- lsm-test/lsmtest_main.c +++ lsm-test/lsmtest_main.c @@ -1331,29 +1331,33 @@ return rc; } static int do_insert(int nArg, char **azArg){ - const char *zConfig = 0; const char *zDb = "lsm"; TestDb *pDb = 0; int i; int rc; const int nRow = 1 * 1000 * 1000; DatasourceDefn defn = { TEST_DATASOURCE_RANDOM, 8, 15, 80, 150 }; Datasource *pData = 0; - if( nArg>2 ){ - testPrintError("Usage: insert ?DATABASE? ?LSM-CONFIG?\n"); + if( nArg>1 ){ + testPrintError("Usage: insert ?DATABASE?\n"); return 1; } if( nArg==1 ){ zDb = azArg[0]; } - if( nArg==2 ){ zConfig = azArg[1]; } testMallocUninstall(tdb_lsm_env()); - rc = tdb_open(zDb, 0, 1, &pDb); + for(i=0; zDb[i] && zDb[i]!='='; i++); + if( zDb[i] ){ + rc = tdb_lsm_open(zDb, "testdb.lsm", 1, &pDb); + }else{ + rc = tdb_open(zDb, 0, 1, &pDb); + } + if( rc!=0 ){ testPrintError("Error opening db \"%s\": %d\n", zDb, rc); }else{ InsertWriteHook hook; memset(&hook, 0, sizeof(hook)); @@ -1360,13 +1364,10 @@ hook.pOut = fopen("writelog.txt", "w"); pData = testDatasourceNew(&defn); tdb_lsm_config_work_hook(pDb, do_insert_work_hook, 0); tdb_lsm_write_hook(pDb, do_insert_write_hook, (void *)&hook); - if( zConfig ){ - rc = tdb_lsm_config_str(pDb, zConfig); - } if( rc==0 ){ for(i=0; iszSector/4); + /* Fall-through */ case 2: pEnv->xWrite( pFile, (lsm_i64)i * pDb->szSector, aOld, pDb->szSector ); @@ -387,10 +388,65 @@ sqlite3_free(zFree); } /* ** End test VFS code. +************************************************************************** +*************************************************************************/ + +/************************************************************************* +************************************************************************** +** Begin test compression hooks. +*/ + +#ifdef HAVE_ZLIB +#include + +static int testZipBound(void *pCtx, int nSrc){ + return compressBound(nSrc); +} + +static int testZipCompress( + void *pCtx, /* Context pointer */ + char *aOut, int *pnOut, /* OUT: Buffer containing compressed data */ + const char *aIn, int nIn /* Buffer containing input data */ +){ + uLongf n = *pnOut; /* In/out buffer size for compress() */ + int rc; /* compress() return code */ + + rc = compress((Bytef*)aOut, &n, (Bytef*)aIn, nIn); + *pnOut = n; + return (rc==Z_OK ? 0 : LSM_ERROR); +} + +static int testZipUncompress( + void *pCtx, /* Context pointer */ + char *aOut, int *pnOut, /* OUT: Buffer containing uncompressed data */ + const char *aIn, int nIn /* Buffer containing input data */ +){ + uLongf n = *pnOut; /* In/out buffer size for uncompress() */ + int rc; /* uncompress() return code */ + + rc = uncompress((Bytef*)aOut, &n, (Bytef*)aIn, nIn); + *pnOut = n; + return (rc==Z_OK ? 0 : LSM_ERROR); +} + +static int testConfigureCompression(lsm_db *pDb){ + static lsm_compress zip = { + 1, sizeof(lsm_compress), + 0, /* Context pointer (unused) */ + testZipBound, /* xBound method */ + testZipCompress, /* xCompress method */ + testZipUncompress /* xUncompress method */ + }; + return lsm_config(pDb, LSM_CONFIG_SET_COMPRESSION, &zip); +} +#endif /* ifdef HAVE_ZLIB */ + +/* +** End test compression hooks. ************************************************************************** *************************************************************************/ static int test_lsm_close(TestDb *pTestDb){ int i; @@ -615,10 +671,11 @@ if( p->xWork ) p->xWork(db, p->pWorkCtx); } #define TEST_NO_RECOVERY -1 #define TEST_THREADS -2 +#define TEST_COMPRESSION -3 static int test_lsm_config_str( LsmDb *pLsm, lsm_db *db, int bWorker, @@ -643,10 +700,13 @@ { "max_freelist", 0, LSM_CONFIG_MAX_FREELIST }, { "multi_proc", 0, LSM_CONFIG_MULTIPLE_PROCESSES }, { "worker_nmerge", 1, LSM_CONFIG_NMERGE }, { "test_no_recovery", 0, TEST_NO_RECOVERY }, { "threads", 0, TEST_THREADS }, +#ifdef HAVE_ZLIB + { "compression", 0, TEST_COMPRESSION }, +#endif { 0, 0 } }; const char *z = zStr; int nThread = 1; @@ -694,10 +754,15 @@ pLsm->bNoRecovery = iVal; break; case TEST_THREADS: nThread = iVal; break; +#ifdef HAVE_ZLIB + case TEST_COMPRESSION: + testConfigureCompression(db); + break; +#endif } } } }else if( z!=zStart ){ goto syntax_error; @@ -830,11 +895,27 @@ const char *zFilename, int bClear, TestDb **ppDb ){ const char *zCfg = - "page_size=256 block_size=65536 write_buffer=16384 max_freelist=4 autocheckpoint=32768"; + "page_size=256 block_size=65536 write_buffer=16384 " + "max_freelist=4 autocheckpoint=32768 " + "mmap=0 " + ; + return testLsmOpen(zCfg, zFilename, bClear, ppDb); +} + +int test_lsm_zip_open( + const char *zFilename, + int bClear, + TestDb **ppDb +){ + const char *zCfg = + "page_size=256 block_size=65536 write_buffer=16384 " + "max_freelist=4 autocheckpoint=32768 compression=1" + "mmap=0 " + ; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } lsm_db *tdb_lsm(TestDb *pDb){ if( pDb->pMethods->xClose==test_lsm_close ){ Index: src/kvlsm.c ================================================================== --- src/kvlsm.c +++ src/kvlsm.c @@ -453,10 +453,12 @@ pNew->base.pStoreVfunc = &kvlsmMethods; pNew->base.pEnv = pEnv; rc = lsm_new(0, &pNew->pDb); if( rc==SQLITE4_OK ){ int i; + int bMmap = 0; + lsm_config(pNew->pDb, LSM_CONFIG_MMAP, &bMmap); for(i=0; ipDb, aConfig[i].eParam, &nVal); Index: src/lsm.h ================================================================== --- src/lsm.h +++ src/lsm.h @@ -31,10 +31,11 @@ /* 64-bit integer type used for file offsets. */ typedef long long int lsm_i64; /* 64-bit signed integer type */ /* Forward reference */ typedef struct lsm_env lsm_env; /* Runtime environment */ +typedef struct lsm_compress lsm_compress; /* Compression library functions */ /* Candidate values for the 3rd argument to lsm_env.xLock() */ #define LSM_LOCK_UNLOCK 0 #define LSM_LOCK_SHARED 1 #define LSM_LOCK_EXCL 2 @@ -42,11 +43,11 @@ /* ** Run-time environment used by LSM */ struct lsm_env { int nByte; /* Size of this structure in bytes */ - int iVersion; /* Version number of this structure */ + int iVersion; /* Version number of this structure (1) */ /****** file i/o ***********************************************/ void *pVfsCtx; int (*xFullpath)(lsm_env*, const char *, char *, int *); int (*xOpen)(lsm_env*, const char *, lsm_file **); int (*xRead)(lsm_file *, lsm_i64, void *, int); @@ -78,10 +79,27 @@ void (*xMutexLeave)(lsm_mutex *); /* Leave a mutex */ int (*xMutexHeld)(lsm_mutex *); /* Return true if mutex is held */ int (*xMutexNotHeld)(lsm_mutex *); /* Return true if mutex not held */ /****** other ****************************************************/ int (*xSleep)(lsm_env*, int microseconds); + + /* New fields may be added in future releases, in which case the + ** iVersion value will increase. */ +}; + +/* +** The compression library interface. +*/ +struct lsm_compress { + int nByte; /* Size of this structure in bytes */ + int iVersion; /* Version number of this structure (1) */ + + /* Compression library functions */ + void *pCtx; + int (*xBound)(void *, int nSrc); + int (*xCompress)(void *, char *, int *, const char *, int); + int (*xUncompress)(void *, char *, int *, const char *, int); /* New fields may be added in future releases, in which case the ** iVersion value will increase. */ }; @@ -190,10 +208,28 @@ ** There is no reason for an application to configure or query this ** parameter. It is only present because configuring a small value ** makes certain parts of the lsm code easier to test. ** ** LSM_CONFIG_MULTIPLE_PROCESSES +** A read/write boolean parameter. This parameter may only be set before +** lsm_open() has been called. If true, the library uses shared-memory +** and posix advisory locks to co-ordinate access by clients from within +** multiple processes. Otherwise, if false, all database clients must be +** located in the same process. The default value is true. +** +** LSM_CONFIG_SET_COMPRESSION +** Set the compression methods used to compress and decompress database +** content. The argument to this option should be a pointer to a structure +** of type lsm_compress. The lsm_config() method takes a copy of the +** structures contents. +** +** This option may only be used before lsm_open() is called. Invoking it +** after lsm_open() has been called results in an LSM_MISUSE error. +** +** LSM_CONFIG_GET_COMPRESSION +** Query the compression methods used to compress and decompress database +** content. */ #define LSM_CONFIG_WRITE_BUFFER 1 #define LSM_CONFIG_PAGE_SIZE 2 #define LSM_CONFIG_SAFETY 3 #define LSM_CONFIG_BLOCK_SIZE 4 @@ -203,10 +239,13 @@ #define LSM_CONFIG_USE_LOG 8 #define LSM_CONFIG_NMERGE 9 #define LSM_CONFIG_MAX_FREELIST 10 #define LSM_CONFIG_MULTIPLE_PROCESSES 11 #define LSM_CONFIG_AUTOCHECKPOINT 12 + +#define LSM_CONFIG_SET_COMPRESSION 13 +#define LSM_CONFIG_GET_COMPRESSION 14 #define LSM_SAFETY_OFF 0 #define LSM_SAFETY_NORMAL 1 #define LSM_SAFETY_FULL 2 Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -101,12 +101,12 @@ typedef unsigned short int u16; typedef unsigned int u32; typedef lsm_i64 i64; typedef unsigned long long int u64; -/* A page number is an integer. */ -typedef int Pgno; +/* A page number is a 64-bit integer. */ +typedef i64 Pgno; #ifdef LSM_DEBUG int lsmErrorBkpt(int); #else # define lsmErrorBkpt(x) (x) @@ -141,11 +141,11 @@ /* ** Hard limit on the number of free-list entries that may be stored in ** a checkpoint (the remainder are stored as a system record in the LSM). ** See also LSM_CONFIG_MAX_FREELIST. */ -#define LSM_MAX_FREELIST_ENTRIES 100 +#define LSM_MAX_FREELIST_ENTRIES 24 #define LSM_ATTEMPTS_BEFORE_PROTOCOL 10000 /* @@ -302,10 +302,11 @@ int nDfltBlksz; /* Configured by LSM_CONFIG_BLOCK_SIZE */ int nMaxFreelist; /* Configured by LSM_CONFIG_MAX_FREELIST */ int bMmap; /* Configured by LSM_CONFIG_MMAP */ int nAutockpt; /* Configured by LSM_CONFIG_AUTOCHECKPOINT */ int bMultiProc; /* Configured by L_C_MULTIPLE_PROCESSES */ + lsm_compress compress; /* Compression callbacks */ /* Sub-system handles */ FileSystem *pFS; /* On-disk portion of database */ Database *pDatabase; /* Database shared data */ @@ -340,13 +341,13 @@ u32 aSnapshot[LSM_META_PAGE_SIZE / sizeof(u32)]; }; struct Segment { Pgno iFirst; /* First page of this run */ - Pgno iLast; /* Last page of this run */ - Pgno iRoot; /* Root page number (if any) */ - int nSize; /* Size of this run in pages */ + Pgno iLastPg; /* Last page of this run */ + Pgno iRoot; /* Root page number (if any) */ + int nSize; /* Size of this run in pages */ }; /* ** iSplitTopic/pSplitKey/nSplitKey: ** If nRight>0, this buffer contains a copy of the largest key that has @@ -371,13 +372,10 @@ ** snapshot. ** ** It is assumed that code that uses an instance of this structure has ** access to the associated Level struct. ** -** bHierReadonly: -** True if the b-tree hierarchy is currently read-only. -** ** iOutputOff: ** The byte offset to write to next within the last page of the ** output segment. */ struct MergeInput { @@ -389,11 +387,10 @@ MergeInput *aInput; /* Array nInput entries in size */ MergeInput splitkey; /* Location in file of current splitkey */ int nSkip; /* Number of separators entries to skip */ int iOutputOff; /* Write offset on output page */ Pgno iCurrentPtr; /* Current pointer value */ - int bHierReadonly; /* True if b-tree heirarchies are read-only */ }; /* ** The first argument to this macro is a pointer to a Segment structure. ** Returns true if the structure instance indicates that the separators @@ -483,11 +480,11 @@ i64 iId; /* Snapshot id */ i64 iLogOff; /* Log file offset */ /* Used by worker snapshots only */ int nBlock; /* Number of blocks in database file */ - u32 aiAppend[LSM_APPLIST_SZ]; /* Append point list */ + Pgno aiAppend[LSM_APPLIST_SZ]; /* Append point list */ Freelist freelist; /* Free block list */ int nFreelistOvfl; /* Number of extra free-list entries in LSM */ u32 nWrite; /* Total number of pages written to disk */ }; #define LSM_INITIAL_SNAPSHOT_ID 11 @@ -620,11 +617,11 @@ /* Creating, populating, gobbling and deleting sorted runs. */ void lsmFsGobble(lsm_db *, Segment *, Pgno *, int); int lsmFsSortedDelete(FileSystem *, Snapshot *, int, Segment *); int lsmFsSortedFinish(FileSystem *, Segment *); int lsmFsSortedAppend(FileSystem *, Snapshot *, Segment *, Page **); -int lsmFsPhantomMaterialize(FileSystem *, Snapshot *, Segment *); +int lsmFsSortedPadding(FileSystem *, Snapshot *, Segment *); /* Functions to retrieve the lsm_env pointer from a FileSystem or Page object */ lsm_env *lsmFsEnv(FileSystem *); lsm_env *lsmPageEnv(Page *); FileSystem *lsmPageFS(Page *); @@ -632,14 +629,14 @@ int lsmFsSectorSize(FileSystem *); void lsmSortedSplitkey(lsm_db *, Level *, int *); /* Reading sorted run content. */ +int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg); int lsmFsDbPageGet(FileSystem *, Pgno, Page **); int lsmFsDbPageNext(Segment *, Page *, int eDir, Page **); -int lsmFsPageWrite(Page *); u8 *lsmFsPageData(Page *, int *); int lsmFsPageRelease(Page *); int lsmFsPagePersist(Page *); void lsmFsPageRef(Page *); Pgno lsmFsPageNumber(Page *); @@ -650,10 +647,11 @@ int lsmFsMetaPageGet(FileSystem *, int, int, MetaPage **); int lsmFsMetaPageRelease(MetaPage *); u8 *lsmFsMetaPageData(MetaPage *, int *); #ifdef LSM_DEBUG +int lsmFsDbPageIsLast(Segment *pSeg, Page *pPg); int lsmFsIntegrityCheck(lsm_db *); #endif int lsmFsPageWritable(Page *); @@ -698,11 +696,10 @@ void lsmSortedRemap(lsm_db *pDb); void lsmSortedFreeLevel(lsm_env *pEnv, Level *); -int lsmSortedFlushDb(lsm_db *); int lsmSortedAdvanceAll(lsm_db *pDb); int lsmSortedLoadMerge(lsm_db *, Level *, u32 *, int *); int lsmSortedLoadFreelist(lsm_db *pDb, void **, int *); Index: src/lsm_ckpt.c ================================================================== --- src/lsm_ckpt.c +++ src/lsm_ckpt.c @@ -53,20 +53,21 @@ ** ** For each level in the database, a level record. Formatted as follows: ** ** 0. Age of the level. ** 1. The number of right-hand segments (nRight, possibly 0), -** 2. Segment record for left-hand segment (4 integers defined below), -** 3. Segment record for each right-hand segment (4 integers defined below), +** 2. Segment record for left-hand segment (8 integers defined below), +** 3. Segment record for each right-hand segment (8 integers defined below), ** 4. If nRight>0, The number of segments involved in the merge ** 5. if nRight>0, Current nSkip value (see Merge structure defn.), ** 6. For each segment in the merge: -** 5a. Page number of next cell to read during merge +** 5a. Page number of next cell to read during merge (this field +** is 64-bits - 2 integers) ** 5b. Cell number of next cell to read during merge -** 7. Page containing current split-key. +** 7. Page containing current split-key (64-bits - 2 integers). ** 8. Cell within page containing current split-key. -** 9. Current pointer value. +** 9. Current pointer value (64-bits - 2 integers). ** ** The freelist. ** ** 1. Number of free-list entries stored in checkpoint header. ** 2. For each entry: @@ -82,11 +83,12 @@ ** The checksum: ** ** 1. Checksum value 1. ** 2. Checksum value 2. ** -** In the above, a segment record is: +** In the above, a segment record consists of the following four 64-bit +** fields (converted to 2 * u32 by storing the MSW followed by LSW): ** ** 1. First page of array, ** 2. Last page of array, ** 3. Root page of array (or 0), ** 4. Size of array in pages. @@ -105,15 +107,21 @@ ** * For each level in the database not undergoing a merge, add 1. ** ** * For each level in the database that is undergoing a merge, add ** the number of segments on the rhs of the level. ** -** A level record not undergoing a merge is 6 integers. A level record +** A level record not undergoing a merge is 10 integers. A level record ** with nRhs rhs segments and (nRhs+1) input segments (i.e. including the -** separators from the next level) is (6*nRhs+12) integers. The maximum -** per right-hand-side level is therefore 12 integers. So the maximum -** size of all level records in a checkpoint is 12*40=480 integers. +** separators from the next level) is (11*nRhs+20) integers. The maximum +** per right-hand-side level is therefore 21 integers. So the maximum +** size of all level records in a checkpoint is 21*40=820 integers. +** +** TODO: Before pointer values were changed from 32 to 64 bits, the above +** used to come to 420 bytes - leaving significant space for a free-list +** prefix. No more. To fix this, reduce the size of the level records in +** a db snapshot, and improve management of the free-list tail in +** lsm_sorted.c. */ #define LSM_MAX_RHS_SEGMENTS 40 /* ** LARGE NUMBERS OF FREELIST ENTRIES: @@ -159,13 +167,11 @@ #define LSM_LITTLE_ENDIAN (*(u8 *)(&one)) /* Sizes, in integers, of various parts of the checkpoint. */ #define CKPT_HDR_SIZE 9 #define CKPT_LOGPTR_SIZE 4 -#define CKPT_SEGMENT_SIZE 4 -#define CKPT_CKSUM_SIZE 2 -#define CKPT_APPENDLIST_SIZE LSM_APPLIST_SZ +#define CKPT_APPENDLIST_SIZE (LSM_APPLIST_SZ * 2) /* A #define to describe each integer in the checkpoint header. */ #define CKPT_HDR_ID_MSW 0 #define CKPT_HDR_ID_LSW 1 #define CKPT_HDR_NCKPT 2 @@ -259,10 +265,28 @@ ckptChecksum(p->aCkpt, nCkpt+2, &aCksum[0], &aCksum[1]); ckptSetValue(p, nCkpt, aCksum[0], pRc); ckptSetValue(p, nCkpt+1, aCksum[1], pRc); } } + +static void ckptAppend64(CkptBuffer *p, int *piOut, i64 iVal, int *pRc){ + int iOut = *piOut; + ckptSetValue(p, iOut++, (iVal >> 32) & 0xFFFFFFFF, pRc); + ckptSetValue(p, iOut++, (iVal & 0xFFFFFFFF), pRc); + *piOut = iOut; +} + +static i64 ckptRead64(u32 *a){ + return (((i64)a[0]) << 32) + (i64)a[1]; +} + +static i64 ckptGobble64(u32 *a, int *piIn){ + int iIn = *piIn; + *piIn += 2; + return ckptRead64(&a[iIn]); +} + /* ** Append a 6-value segment record corresponding to pSeg to the checkpoint ** buffer passed as the third argument. */ @@ -270,18 +294,14 @@ Segment *pSeg, CkptBuffer *p, int *piOut, int *pRc ){ - int iOut = *piOut; - - ckptSetValue(p, iOut++, pSeg->iFirst, pRc); - ckptSetValue(p, iOut++, pSeg->iLast, pRc); - ckptSetValue(p, iOut++, pSeg->iRoot, pRc); - ckptSetValue(p, iOut++, pSeg->nSize, pRc); - - *piOut = iOut; + ckptAppend64(p, piOut, pSeg->iFirst, pRc); + ckptAppend64(p, piOut, pSeg->iLastPg, pRc); + ckptAppend64(p, piOut, pSeg->iRoot, pRc); + ckptAppend64(p, piOut, pSeg->nSize, pRc); } static void ckptExportLevel( Level *pLevel, /* Level object to serialize */ CkptBuffer *p, /* Append new level record to this ckpt */ @@ -306,16 +326,16 @@ || pMerge->nInput==pLevel->nRight+1 ); ckptSetValue(p, iOut++, pMerge->nInput, pRc); ckptSetValue(p, iOut++, pMerge->nSkip, pRc); for(i=0; inInput; i++){ - ckptSetValue(p, iOut++, pMerge->aInput[i].iPg, pRc); + ckptAppend64(p, &iOut, pMerge->aInput[i].iPg, pRc); ckptSetValue(p, iOut++, pMerge->aInput[i].iCell, pRc); } - ckptSetValue(p, iOut++, pMerge->splitkey.iPg, pRc); + ckptAppend64(p, &iOut, pMerge->splitkey.iPg, pRc); ckptSetValue(p, iOut++, pMerge->splitkey.iCell, pRc); - ckptSetValue(p, iOut++, pMerge->iCurrentPtr, pRc); + ckptAppend64(p, &iOut, pMerge->iCurrentPtr, pRc); } *piOut = iOut; } @@ -333,12 +353,11 @@ assert( iOut==CKPT_HDR_LO_MSW ); if( bFlush ){ i64 iOff = pDb->treehdr.iOldLog; - ckptSetValue(p, iOut++, (iOff >> 32) & 0xFFFFFFFF, pRc); - ckptSetValue(p, iOut++, (iOff & 0xFFFFFFFF), pRc); + ckptAppend64(p, &iOut, iOff, pRc); ckptSetValue(p, iOut++, pDb->treehdr.oldcksum0, pRc); ckptSetValue(p, iOut++, pDb->treehdr.oldcksum1, pRc); }else{ for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){ ckptSetValue(p, iOut, pDb->pShmhdr->aSnap2[iOut], pRc); @@ -353,17 +372,15 @@ CkptBuffer *p, /* Checkpoint buffer to write to */ int *piOut, /* IN/OUT: Offset within checkpoint buffer */ int *pRc /* IN/OUT: Error code */ ){ int i; - int iOut = *piOut; - u32 *aiAppend = db->pWorker->aiAppend; + Pgno *aiAppend = db->pWorker->aiAppend; - for(i=0; iiFirst==0 && pSegment->iLast==0 ); + assert( pSegment->iFirst==0 && pSegment->iLastPg==0 ); assert( pSegment->nSize==0 && pSegment->iRoot==0 ); - pSegment->iFirst = aIn[iIn++]; - pSegment->iLast = aIn[iIn++]; - pSegment->iRoot = aIn[iIn++]; - pSegment->nSize = aIn[iIn++]; - - *piIn = iIn; + pSegment->iFirst = ckptGobble64(aIn, piIn); + pSegment->iLastPg = ckptGobble64(aIn, piIn); + pSegment->iRoot = ckptGobble64(aIn, piIn); + pSegment->nSize = ckptGobble64(aIn, piIn); + assert( pSegment->iFirst ); } static int ckptSetupMerge(lsm_db *pDb, u32 *aInt, int *piIn, Level *pLevel){ Merge *pMerge; /* Allocated Merge object */ int nInput; /* Number of input segments in merge */ @@ -493,19 +507,18 @@ /* Populate the Merge object. */ pMerge->aInput = (MergeInput *)&pMerge[1]; pMerge->nInput = nInput; pMerge->iOutputOff = -1; - pMerge->bHierReadonly = 1; pMerge->nSkip = (int)aInt[iIn++]; for(i=0; iaInput[i].iPg = (Pgno)aInt[iIn++]; + pMerge->aInput[i].iPg = ckptGobble64(aInt, &iIn); pMerge->aInput[i].iCell = (int)aInt[iIn++]; } - pMerge->splitkey.iPg = (Pgno)aInt[iIn++]; + pMerge->splitkey.iPg = ckptGobble64(aInt, &iIn); pMerge->splitkey.iCell = (int)aInt[iIn++]; - pMerge->iCurrentPtr = (int)aInt[iIn++]; + pMerge->iCurrentPtr = ckptGobble64(aInt, &iIn); /* Set *piIn and return LSM_OK. */ *piIn = iIn; return LSM_OK; } @@ -887,11 +900,11 @@ 0, /* CKPT_HDR_NLEVEL */ 0, /* CKPT_HDR_PGSZ */ 0, /* CKPT_HDR_OVFL */ 0, /* CKPT_HDR_NWRITE */ 0, 0, 1234, 5678, /* The log pointer and initial checksum */ - 0, 0, 0, 0, /* The append list */ + 0,0,0,0, 0,0,0,0, /* The append list */ 0, /* The free block list */ 0, 0 /* Space for checksum values */ }; u32 nCkpt = array_size(aCkpt); ShmHeader *pShm = pDb->pShmhdr; @@ -1049,11 +1062,11 @@ Snapshot *pNew; pNew = (Snapshot *)lsmMallocZeroRc(pDb->pEnv, sizeof(Snapshot), &rc); if( rc==LSM_OK ){ int nFree; - int nCopy; + int i; int nLevel = (int)aCkpt[CKPT_HDR_NLEVEL]; int iIn = CKPT_HDR_SIZE + CKPT_APPENDLIST_SIZE + CKPT_LOGPTR_SIZE; pNew->iId = lsmCheckpointId(aCkpt, 0); pNew->nBlock = aCkpt[CKPT_HDR_NBLOCK]; @@ -1060,12 +1073,14 @@ pNew->nWrite = aCkpt[CKPT_HDR_NWRITE]; rc = ckptLoadLevels(pDb, aCkpt, &iIn, nLevel, &pNew->pLevel); pNew->iLogOff = lsmCheckpointLogOffset(aCkpt); /* Make a copy of the append-list */ - nCopy = sizeof(u32) * LSM_APPLIST_SZ; - memcpy(pNew->aiAppend, &aCkpt[CKPT_HDR_SIZE+CKPT_LOGPTR_SIZE], nCopy); + for(i=0; iaiAppend[i] = ckptRead64(a); + } /* Copy the free-list */ if( bInclFreelist ){ pNew->nFreelistOvfl = aCkpt[CKPT_HDR_OVFL]; nFree = aCkpt[iIn++]; Index: src/lsm_file.c ================================================================== --- src/lsm_file.c +++ src/lsm_file.c @@ -8,11 +8,11 @@ ** May you find forgiveness for yourself and forgive others. ** May you share freely, never taking more than you give. ** ************************************************************************* ** -** DATABASE FILE FORMAT +** NORMAL DATABASE FILE FORMAT ** ** The following database file format concepts are used by the code in ** this file to read and write the database file. ** ** Pages: @@ -83,17 +83,77 @@ ** lsmFsSyncLog ** lsmFsReadLog ** lsmFsTruncateLog ** lsmFsCloseAndDeleteLog ** +** COMPRESSED DATABASE FILE FORMAT +** +** The compressed database file format is very similar to the normal format. +** The file still begins with two 4KB meta-pages (which are never compressed). +** It is still divided into blocks. +** +** The first and last four bytes of each block are reserved for 32-bit +** pointer values. Similar to the way four bytes are carved from the end of +** the first and last page of each block in uncompressed databases. From +** the point of view of the upper layer, all pages are the same size - this +** is different from the uncompressed format where the first and last pages +** on each block are 4 bytes smaller than the others. +** +** Pages are stored in variable length compressed form, as follows: +** +** * 3-byte size field containing the size of the compressed page image +** in bytes. The most significant bit of each byte of the size field +** is always set. The remaining 7 bits are used to store a 21-bit +** integer value (in big-endian order - the first byte in the field +** contains the most significant 7 bits). Since the maximum allowed +** size of a compressed page image is (2^17 - 1) bytes, there are +** actually 4 unused bits in the size field. +** +** In other words, if the size of the compressed page image is nSz, +** the header can be serialized as follows: +** +** u8 aHdr[3] +** aHdr[0] = 0x80 | (u8)(nSz >> 14); +** aHdr[1] = 0x80 | (u8)(nSz >> 7); +** aHdr[2] = 0x80 | (u8)(nSz >> 0); +** +** * Compressed page image. +** +** * A second copy of the 3-byte record header. +** +** A page number is a byte offset into the database file. So the smallest +** possible page number is 8192 (immediately after the two meta-pages). +** The first and root page of a segment are identified by a page number +** corresponding to the byte offset of the first byte in the corresponding +** page record. The last page of a segment is identified by the byte offset +** of the last byte in its record. +** +** Unlike uncompressed pages, compressed page records may span blocks. +** +** Sometimes, in order to avoid touching sectors that contain synced data +** when writing, it is necessary to insert unused space between compressed +** page records. This can be done as follows: +** +** * For less than 6 bytes of empty space, the first and last byte +** of the free space contain the total number of free bytes. For +** example: +** +** Block of 4 free bytes: 0x04 0x?? 0x?? 0x04 +** Block of 2 free bytes: 0x02 0x02 +** A single free byte: 0x01 +** +** * For 6 or more bytes of empty space, a record similar to a +** compressed page record is added to the segment. A padding record +** is distinguished from a compressed page record by the most +** significant bit of the second byte of the size field, which is +** cleared instead of set. */ #include "lsmInt.h" #include #include #include - /* ** File-system object. Each database connection allocates a single instance ** of the following structure. It is used for all access to the database and ** log files. @@ -122,10 +182,17 @@ /* r/w file descriptors for both files. */ LsmFile *pLsmFile; lsm_file *fdDb; /* Database file */ lsm_file *fdLog; /* Log file */ + int szSector; /* Database file sector size */ + + /* If this is a compressed database, a pointer to the compression methods. + ** For an uncompressed database, a NULL pointer. */ + lsm_compress *pCompress; + u8 *aBuffer; /* Buffer to compress into */ + int nBuffer; /* Allocated size of aBuffer[] in bytes */ /* mmap() mode things */ int bUseMmap; /* True to use mmap() to access db file */ void *pMap; /* Current mapping of database file */ i64 nMap; /* Bytes mapped at pMap */ @@ -145,21 +212,36 @@ Page **apHash; /* nHash Hash slots */ }; /* ** Database page handle. +** +** pSeg: +** When lsmFsSortedAppend() is called on a compressed database, the new +** page is not assigned a page number or location in the database file +** immediately. Instead, these are assigned by the lsmFsPagePersist() call +** right before it writes the compressed page image to disk. +** +** The lsmFsSortedAppend() function sets the pSeg pointer to point to the +** segment that the new page will be a part of. It is unset by +** lsmFsPagePersist() after the page is written to disk. */ struct Page { u8 *aData; /* Buffer containing page data */ int nData; /* Bytes of usable data at aData[] */ - int iPg; /* Page number */ + Pgno iPg; /* Page number */ int nRef; /* Number of outstanding references */ int flags; /* Combination of PAGE_XXX flags */ Page *pHashNext; /* Next page in hash table slot */ Page *pLruNext; /* Next page in LRU list */ Page *pLruPrev; /* Previous page in LRU list */ FileSystem *pFS; /* File system that owns this page */ + + /* Only used in compressed database mode: */ + int nCompress; /* Compressed size (or 0 for uncomp. db) */ + int nCompressPrev; /* Compressed size of prev page */ + Segment *pSeg; /* Segment this page will be written to */ }; /* ** Meta-data page handle. There are two meta-data pages at the start of ** the database file, each FileSystem.nMetasize bytes in size. @@ -182,10 +264,26 @@ ** Number of pgsz byte pages omitted from the start of block 1. The start ** of block 1 contains two 4096 byte meta pages (8192 bytes in total). */ #define BLOCK1_HDR_SIZE(pgsz) LSM_MAX(1, 8192/(pgsz)) +/* +** If NDEBUG is not defined, set a breakpoint in function lsmIoerrBkpt() +** to catch IO errors. +*/ +#ifndef NDEBUG +static void lsmIoerrBkpt(){ + static int nErr = 0; + nErr++; +} +static int IOERR_WRAPPER(int rc){ + if( rc!=LSM_OK ) lsmIoerrBkpt(); + return rc; +} +#else +# define IOERR_WRAPPER(rc) (rc) +#endif /* ** Wrappers around the VFS methods of the lsm_env object: ** ** lsmEnvOpen() @@ -206,35 +304,35 @@ lsm_file *pFile, lsm_i64 iOff, void *pRead, int nRead ){ - return pEnv->xRead(pFile, iOff, pRead, nRead); + return IOERR_WRAPPER( pEnv->xRead(pFile, iOff, pRead, nRead) ); } static int lsmEnvWrite( lsm_env *pEnv, lsm_file *pFile, lsm_i64 iOff, - void *pWrite, + const void *pWrite, int nWrite ){ - return pEnv->xWrite(pFile, iOff, pWrite, nWrite); + return IOERR_WRAPPER( pEnv->xWrite(pFile, iOff, (void *)pWrite, nWrite) ); } static int lsmEnvSync(lsm_env *pEnv, lsm_file *pFile){ - return pEnv->xSync(pFile); + return IOERR_WRAPPER( pEnv->xSync(pFile) ); } static int lsmEnvSectorSize(lsm_env *pEnv, lsm_file *pFile){ return pEnv->xSectorSize(pFile); } int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile){ - return pEnv->xClose(pFile); + return IOERR_WRAPPER( pEnv->xClose(pFile) ); } static int lsmEnvTruncate(lsm_env *pEnv, lsm_file *pFile, lsm_i64 nByte){ - return pEnv->xTruncate(pFile, nByte); + return IOERR_WRAPPER( pEnv->xTruncate(pFile, nByte) ); } static int lsmEnvUnlink(lsm_env *pEnv, const char *zDel){ - return pEnv->xUnlink(pEnv, zDel); + return IOERR_WRAPPER( pEnv->xUnlink(pEnv, zDel) ); } static int lsmEnvRemap( lsm_env *pEnv, lsm_file *pFile, i64 szMin, @@ -395,11 +493,15 @@ pFS->nPagesize = LSM_DFLT_PAGE_SIZE; pFS->nBlocksize = LSM_DFLT_BLOCK_SIZE; pFS->nMetasize = 4 * 1024; pFS->pDb = pDb; pFS->pEnv = pDb->pEnv; - pFS->bUseMmap = pDb->bMmap; + if( pDb->compress.xCompress ){ + pFS->pCompress = &pDb->compress; + }else{ + pFS->bUseMmap = pDb->bMmap; + } /* Make a copy of the database and log file names. */ memcpy(pFS->zDb, zDb, nDb+1); memcpy(pFS->zLog, zDb, nDb); memcpy(&pFS->zLog[nDb], "-log", 5); @@ -424,10 +526,12 @@ } if( rc!=LSM_OK ){ lsmFsClose(pFS); pFS = 0; + }else{ + pFS->szSector = lsmEnvSectorSize(pFS->pEnv, pFS->fdDb); } } pDb->pFS = pFS; return rc; @@ -452,10 +556,11 @@ if( pFS->fdDb ) lsmEnvClose(pFS->pEnv, pFS->fdDb ); if( pFS->fdLog ) lsmEnvClose(pFS->pEnv, pFS->fdLog ); lsmFree(pEnv, pFS->pLsmFile); lsmFree(pEnv, pFS->apHash); + lsmFree(pEnv, pFS->aBuffer); lsmFree(pEnv, pFS); } } void lsmFsDeferClose(FileSystem *pFS, LsmFile **pp){ @@ -528,47 +633,85 @@ } /* ** Return the page number of the first page on block iBlock. Blocks are ** numbered starting from 1. +** +** For a compressed database, page numbers are byte offsets. The first +** page on each block is the byte offset immediately following the 4-byte +** "previous block" pointer at the start of each block. */ static Pgno fsFirstPageOnBlock(FileSystem *pFS, int iBlock){ - const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); - int iPg; - if( iBlock==1 ){ - iPg = 1 + ((pFS->nMetasize*2 + pFS->nPagesize - 1) / pFS->nPagesize); + Pgno iPg; + if( pFS->pCompress ){ + if( iBlock==1 ){ + iPg = pFS->nMetasize * 2 + 4; + }else{ + iPg = pFS->nBlocksize * (Pgno)(iBlock-1) + 4; + } }else{ - iPg = 1 + (iBlock-1) * nPagePerBlock; + const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); + if( iBlock==1 ){ + iPg = 1 + ((pFS->nMetasize*2 + pFS->nPagesize - 1) / pFS->nPagesize); + }else{ + iPg = 1 + (iBlock-1) * nPagePerBlock; + } } return iPg; } /* ** Return the page number of the last page on block iBlock. Blocks are ** numbered starting from 1. +** +** For a compressed database, page numbers are byte offsets. The first +** page on each block is the byte offset of the byte immediately before +** the 4-byte "next block" pointer at the end of each block. */ static Pgno fsLastPageOnBlock(FileSystem *pFS, int iBlock){ - const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); - return iBlock * nPagePerBlock; + if( pFS->pCompress ){ + return pFS->nBlocksize * (Pgno)iBlock - 1 - 4; + }else{ + const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); + return iBlock * nPagePerBlock; + } +} + +/* +** Return the block number of the block that page iPg is located on. +** Blocks are numbered starting from 1. +*/ +static int fsPageToBlock(FileSystem *pFS, Pgno iPg){ + if( pFS->pCompress ){ + return (iPg / pFS->nBlocksize) + 1; + }else{ + return 1 + ((iPg-1) / (pFS->nBlocksize / pFS->nPagesize)); + } } /* ** Return true if page iPg is the last page on its block. +** +** This function is only called in non-compressed database mode. */ static int fsIsLast(FileSystem *pFS, Pgno iPg){ const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); + assert( !pFS->pCompress ); return ( iPg && (iPg % nPagePerBlock)==0 ); } /* ** Return true if page iPg is the first page on its block. +** +** This function is only called in non-compressed database mode. */ static int fsIsFirst(FileSystem *pFS, Pgno iPg){ const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); - return ( - (iPg % nPagePerBlock)==1 - || (iPgpCompress ); + + return ( (iPg % nPagePerBlock)==1 + || (iPgpFS, pPage->iPg) || fsIsLast(pPage->pFS, pPage->iPg) + int bShort = (pPage->pFS->pCompress==0 && + (fsIsFirst(pPage->pFS, pPage->iPg) || fsIsLast(pPage->pFS, pPage->iPg)) ); assert( bShort==!!(pPage->flags & PAGE_SHORT) ); assert( PAGE_SHORT==4 ); #endif *pnData = pPage->pFS->nPagesize - (pPage->flags & PAGE_SHORT); @@ -591,10 +734,11 @@ /* ** Return the page number of a page. */ Pgno lsmFsPageNumber(Page *pPage){ + assert( (pPage->flags & PAGE_DIRTY)==0 ); return pPage ? pPage->iPg : 0; } /* ** Page pPg is currently part of the LRU list belonging to pFS. Remove @@ -685,21 +829,19 @@ else if( pPg->pFS->bUseMmap ){ fsPageRemoveFromLru(pPg->pFS, pPg); } lsmFree(pPg->pFS->pEnv, pPg); } - -int fsPageToBlock(FileSystem *pFS, Pgno iPg){ - return 1 + ((iPg-1) / (pFS->nBlocksize / pFS->nPagesize)); -} - static void fsGrowMapping( FileSystem *pFS, i64 iSz, int *pRc ){ + /* This function won't work with compressed databases yet. */ + assert( pFS->pCompress==0 ); + if( *pRc==LSM_OK && iSz>pFS->nMap ){ Page *pFix; int rc; u8 *aOld = pFS->pMap; rc = lsmEnvRemap(pFS->pEnv, pFS->fdDb, iSz, &pFS->pMap, &pFS->nMap); @@ -712,19 +854,252 @@ lsmSortedRemap(pFS->pDb); } *pRc = rc; } } + +static int fsPageGet(FileSystem *, Pgno, int, Page **, int *); + +/* +** Parameter iBlock is a database file block. This function reads the value +** stored in the blocks "next block" pointer and stores it in *piNext. +** LSM_OK is returned if everything is successful, or an LSM error code +** otherwise. +*/ +static int fsBlockNext( + FileSystem *pFS, /* File-system object handle */ + int iBlock, /* Read field from this block */ + int *piNext /* OUT: Next block in linked list */ +){ + int rc; + + assert( pFS->bUseMmap==0 || pFS->pCompress==0 ); + if( pFS->pCompress ){ + i64 iOff; /* File offset to read data from */ + u8 aNext[4]; /* 4-byte pointer read from db file */ + + iOff = (i64)iBlock * pFS->nBlocksize - sizeof(aNext); + rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aNext, sizeof(aNext)); + if( rc==LSM_OK ){ + *piNext = (int)lsmGetU32(aNext); + } + }else{ + const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); + Page *pLast; + rc = fsPageGet(pFS, iBlock*nPagePerBlock, 0, &pLast, 0); + if( rc==LSM_OK ){ + *piNext = fsPageToBlock(pFS, lsmGetU32(&pLast->aData[pFS->nPagesize-4])); + lsmFsPageRelease(pLast); + } + } + return rc; +} + +/* +** Return the page number of the last page on the same block as page iPg. +*/ +Pgno fsLastPageOnPagesBlock(FileSystem *pFS, Pgno iPg){ + return fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iPg)); +} + +/* +** This function is only called in compressed database mode. +*/ +static int fsReadData( + FileSystem *pFS, /* File-system handle */ + i64 iOff, /* Read data from this offset */ + u8 *aData, /* Buffer to read data into */ + int nData /* Number of bytes to read */ +){ + i64 iEob; /* End of block */ + int nRead; + int rc; + + assert( pFS->pCompress ); + + iEob = fsLastPageOnPagesBlock(pFS, iOff) + 1; + nRead = LSM_MIN(iEob - iOff, nData); + + rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nRead); + if( rc==LSM_OK && nRead!=nData ){ + int iBlk; + + rc = fsBlockNext(pFS, fsPageToBlock(pFS, iOff), &iBlk); + if( rc==LSM_OK ){ + i64 iOff2 = fsFirstPageOnBlock(pFS, iBlk); + rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff2, &aData[nRead], nData-nRead); + } + } + + return rc; +} + +/* +** Parameter iBlock is a database file block. This function reads the value +** stored in the blocks "previous block" pointer and stores it in *piPrev. +** LSM_OK is returned if everything is successful, or an LSM error code +** otherwise. +*/ +static int fsBlockPrev( + FileSystem *pFS, /* File-system object handle */ + int iBlock, /* Read field from this block */ + int *piPrev /* OUT: Previous block in linked list */ +){ + int rc = LSM_OK; /* Return code */ + + assert( pFS->bUseMmap==0 || pFS->pCompress==0 ); + assert( iBlock>0 ); + + if( pFS->pCompress ){ + i64 iOff = fsFirstPageOnBlock(pFS, iBlock) - 4; + u8 aPrev[4]; /* 4-byte pointer read from db file */ + rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aPrev, sizeof(aPrev)); + if( rc==LSM_OK ){ + *piPrev = (int)lsmGetU32(aPrev); + } + }else{ + assert( 0 ); + } + return rc; +} + +/* +** Encode and decode routines for record size fields. +*/ +static void putRecordSize(u8 *aBuf, int nByte, int bFree){ + aBuf[0] = (u8)(nByte >> 14) | 0x80; + aBuf[1] = ((u8)(nByte >> 7) & 0x7F) | (bFree ? 0x00 : 0x80); + aBuf[2] = (u8)nByte | 0x80; +} +static int getRecordSize(u8 *aBuf, int *pbFree){ + int nByte; + nByte = (aBuf[0] & 0x7F) << 14; + nByte += (aBuf[1] & 0x7F) << 7; + nByte += (aBuf[2] & 0x7F); + *pbFree = !(aBuf[1] & 0x80); + return nByte; +} + +static int fsSubtractOffset(FileSystem *pFS, i64 iOff, int iSub, i64 *piRes){ + i64 iStart; + int iBlk; + int rc; + + assert( pFS->pCompress ); + + iStart = fsFirstPageOnBlock(pFS, fsPageToBlock(pFS, iOff)); + if( (iOff-iSub)>=iStart ){ + *piRes = (iOff-iSub); + return LSM_OK; + } + + rc = fsBlockPrev(pFS, fsPageToBlock(pFS, iOff), &iBlk); + *piRes = fsLastPageOnBlock(pFS, iBlk) - iSub + (iOff - iStart + 1); + return rc; +} + +static int fsAddOffset(FileSystem *pFS, i64 iOff, int iAdd, i64 *piRes){ + i64 iEob; + int iBlk; + int rc; + + assert( pFS->pCompress ); + + iEob = fsLastPageOnPagesBlock(pFS, iOff); + if( (iOff+iAdd)<=iEob ){ + *piRes = (iOff+iAdd); + return LSM_OK; + } + + rc = fsBlockNext(pFS, fsPageToBlock(pFS, iOff), &iBlk); + *piRes = fsFirstPageOnBlock(pFS, iBlk) + iAdd - (iEob - iOff + 1); + return rc; +} + +static int fsAllocateBuffer(FileSystem *pFS){ + assert( pFS->pCompress ); + if( pFS->aBuffer==0 ){ + pFS->nBuffer = pFS->pCompress->xBound(pFS->pCompress->pCtx, pFS->nPagesize); + if( pFS->nBuffer<(pFS->szSector+6) ){ + pFS->nBuffer = pFS->szSector+6; + } + pFS->aBuffer = lsmMalloc(pFS->pEnv, LSM_MAX(pFS->nBuffer, pFS->nPagesize)); + if( pFS->aBuffer==0 ) return LSM_NOMEM_BKPT; + } + return LSM_OK; +} + +/* +** This function is only called in compressed database mode. It reads and +** uncompresses the compressed data for page pPg from the database and +** populates the pPg->aData[] buffer and pPg->nCompress field. +** +** LSM_OK is returned if successful, or an LSM error code otherwise. +*/ +static int fsReadPagedata( + FileSystem *pFS, /* File-system handle */ + Page *pPg, /* Page to read and uncompress data for */ + int *pnSpace /* OUT: Total bytes of free space */ +){ + lsm_compress *p = pFS->pCompress; + i64 iOff = pPg->iPg; + u8 aSz[3]; + int rc; + + assert( p && pPg->nCompress==0 ); + + if( fsAllocateBuffer(pFS) ) return LSM_NOMEM; + + rc = fsReadData(pFS, iOff, aSz, sizeof(aSz)); + + if( rc==LSM_OK ){ + int bFree; + if( aSz[0] & 0x80 ){ + pPg->nCompress = (int)getRecordSize(aSz, &bFree); + }else{ + pPg->nCompress = (int)aSz[0] - sizeof(aSz)*2; + bFree = 1; + } + if( bFree ){ + if( pnSpace ){ + *pnSpace = pPg->nCompress + sizeof(aSz)*2; + }else{ + rc = LSM_CORRUPT_BKPT; + } + }else{ + rc = fsAddOffset(pFS, iOff, 3, &iOff); + if( rc==LSM_OK ){ + if( pPg->nCompress>pFS->nBuffer ){ + rc = LSM_CORRUPT_BKPT; + }else{ + rc = fsReadData(pFS, iOff, pFS->aBuffer, pPg->nCompress); + } + if( rc==LSM_OK ){ + int n = pFS->nPagesize; + rc = p->xUncompress(p->pCtx, + (char *)pPg->aData, &n, + (const char *)pFS->aBuffer, pPg->nCompress + ); + if( rc==LSM_OK && n!=pPg->nData ){ + rc = LSM_CORRUPT_BKPT; + } + } + } + } + } + return rc; +} /* ** Return a handle for a database page. */ -int fsPageGet( +static int fsPageGet( FileSystem *pFS, /* File-system handle */ Pgno iPg, /* Page id */ int noContent, /* True to not load content from disk */ - Page **ppPg /* OUT: New page handle */ + Page **ppPg, /* OUT: New page handle */ + int *pnSpace /* OUT: Bytes of free space */ ){ Page *p; int iHash; int rc = LSM_OK; @@ -763,89 +1138,81 @@ } if( p==0 ){ rc = fsPageBuffer(pFS, 1, &p); if( rc==LSM_OK ){ + int nSpace = 0; p->iPg = iPg; p->nRef = 0; p->pFS = pFS; assert( p->flags==0 || p->flags==PAGE_FREE ); - if( fsIsLast(pFS, iPg) || fsIsFirst(pFS, iPg) ) p->flags |= PAGE_SHORT; + if( pFS->pCompress==0 && (fsIsLast(pFS, iPg) || fsIsFirst(pFS, iPg)) ){ + p->flags |= PAGE_SHORT; + } + p->nData = pFS->nPagesize - (p->flags & PAGE_SHORT); #ifdef LSM_DEBUG memset(p->aData, 0x56, pFS->nPagesize); #endif assert( p->pLruNext==0 && p->pLruPrev==0 ); if( noContent==0 ){ - int nByte = pFS->nPagesize; - i64 iOff; - - iOff = (i64)(iPg-1) * pFS->nPagesize; - rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, p->aData, nByte); + if( pFS->pCompress ){ + rc = fsReadPagedata(pFS, p, &nSpace); + }else{ + int nByte = pFS->nPagesize; + i64 iOff = (i64)(iPg-1) * pFS->nPagesize; + rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, p->aData, nByte); + } pFS->nRead++; } /* If the xRead() call was successful (or not attempted), link the ** page into the page-cache hash-table. Otherwise, if it failed, ** free the buffer. */ - if( rc==LSM_OK ){ + if( rc==LSM_OK && nSpace==0 ){ p->pHashNext = pFS->apHash[iHash]; - p->nData = pFS->nPagesize - (p->flags & PAGE_SHORT); pFS->apHash[iHash] = p; }else{ fsPageBufferFree(p); p = 0; + if( pnSpace ) *pnSpace = nSpace; } } }else if( p->nRef==0 ){ fsPageRemoveFromLru(pFS, p); } - assert( (rc==LSM_OK && p) || (rc!=LSM_OK && p==0) ); + assert( (rc==LSM_OK && (p || (pnSpace && *pnSpace))) + || (rc!=LSM_OK && p==0) + ); } - if( rc==LSM_OK ){ + if( rc==LSM_OK && p ){ pFS->nOut += (p->nRef==0); p->nRef++; } *ppPg = p; return rc; } -static int fsBlockNext( - FileSystem *pFS, - int iBlock, - int *piNext -){ - const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); - Page *pLast; - int rc; - - rc = fsPageGet(pFS, iBlock*nPagePerBlock, 0, &pLast); - if( rc==LSM_OK ){ - *piNext = fsPageToBlock(pFS, lsmGetU32(&pLast->aData[pFS->nPagesize-4])); - lsmFsPageRelease(pLast); - } - return rc; -} static int fsRunEndsBetween( Segment *pRun, Segment *pIgnore, - int iFirst, - int iLast + Pgno iFirst, + Pgno iLast ){ return (pRun!=pIgnore && ( (pRun->iFirst>=iFirst && pRun->iFirst<=iLast) - || (pRun->iLast>=iFirst && pRun->iLast<=iLast) + || (pRun->iLastPg>=iFirst && pRun->iLastPg<=iLast) )); } static int fsLevelEndsBetween( Level *pLevel, Segment *pIgnore, - int iFirst, - int iLast + Pgno iFirst, + Pgno iLast ){ int i; if( fsRunEndsBetween(&pLevel->lhs, pIgnore, iFirst, iLast) ){ return 1; @@ -870,11 +1237,11 @@ int iLast; /* Last page on block iBlk */ Level *pLevel; /* Used to iterate through levels */ int iIn; /* Used to iterate through append points */ int iOut = 0; /* Used to output append points */ - u32 *aApp = pSnapshot->aiAppend; + Pgno *aApp = pSnapshot->aiAppend; iFirst = fsFirstPageOnBlock(pFS, iBlk); iLast = fsLastPageOnBlock(pFS, iBlk); /* Check if any other run in the snapshot has a start or end page @@ -912,18 +1279,18 @@ int iBlk; int iLastBlk; iBlk = fsPageToBlock(pFS, pDel->iFirst); - iLastBlk = fsPageToBlock(pFS, pDel->iLast); + iLastBlk = fsPageToBlock(pFS, pDel->iLastPg); /* Mark all blocks currently used by this sorted run as free */ while( iBlk && rc==LSM_OK ){ int iNext = 0; if( iBlk!=iLastBlk ){ rc = fsBlockNext(pFS, iBlk, &iNext); - }else if( bZero==0 && pDel->iLast!=fsLastPageOnBlock(pFS, iLastBlk) ){ + }else if( bZero==0 && pDel->iLastPg!=fsLastPageOnBlock(pFS, iLastBlk) ){ break; } rc = fsFreeBlock(pFS, pSnapshot, pDel, iBlk); iBlk = iNext; } @@ -943,40 +1310,10 @@ } } return iRet; } -#if 0 -void fsOldGobble( - FileSystem *pFS, - Snapshot *pSnapshot, - Segment *pRun, - Pgno iPg -){ - if( iPg!=pRun->iFirst ){ - int rc = LSM_OK; - int iBlk = fsPageToBlock(pFS, pRun->iFirst); - int iFirstBlk = fsPageToBlock(pFS, iPg); - - pRun->nSize += (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk)); - pRun->iFirst = iPg; - while( rc==LSM_OK && iBlk!=iFirstBlk ){ - int iNext = 0; - rc = fsBlockNext(pFS, iBlk, &iNext); - if( rc==LSM_OK ) rc = fsFreeBlock(pFS, pSnapshot, 0, iBlk); - pRun->nSize -= ( - 1 + fsLastPageOnBlock(pFS, iBlk) - fsFirstPageOnBlock(pFS, iBlk) - ); - iBlk = iNext; - } - - pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk)); - assert( pRun->nSize>0 ); - } -} -#endif - /* ** Argument aPgno is an array of nPgno page numbers. All pages belong to ** the segment pRun. This function gobbles from the start of the run to the ** first page that appears in aPgno[] (i.e. so that the aPgno[] entry is ** the new first page of the run). @@ -1013,10 +1350,55 @@ pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk)); assert( pRun->nSize>0 ); } +static int fsNextPageOffset( + FileSystem *pFS, /* File system object */ + Segment *pSeg, /* Segment to move within */ + Pgno iPg, /* Offset of current page */ + int nByte, /* Size of current page including headers */ + Pgno *piNext /* OUT: Offset of next page. Or zero (EOF) */ +){ + Pgno iNext; + int rc; + + assert( pFS->pCompress ); + + rc = fsAddOffset(pFS, iPg, nByte-1, &iNext); + if( pSeg && iNext==pSeg->iLastPg ){ + iNext = 0; + }else if( rc==LSM_OK ){ + rc = fsAddOffset(pFS, iNext, 1, &iNext); + } + + *piNext = iNext; + return rc; +} + +static int fsGetPageBefore(FileSystem *pFS, i64 iOff, Pgno *piPrev){ + u8 aSz[3]; + int rc; + i64 iRead; + + rc = fsSubtractOffset(pFS, iOff, sizeof(aSz), &iRead); + if( rc==LSM_OK ) rc = fsReadData(pFS, iRead, aSz, sizeof(aSz)); + + if( rc==LSM_OK ){ + int bFree; + int nSz; + if( aSz[2] & 0x80 ){ + nSz = getRecordSize(aSz, &bFree) + sizeof(aSz)*2; + }else{ + nSz = (int)(aSz[2] & 0x7F); + bFree = 1; + } + rc = fsSubtractOffset(pFS, iOff, nSz, piPrev); + } + + return rc; +} /* ** The first argument to this function is a valid reference to a database ** file page that is part of a sorted run. If parameter eDir is -1, this ** function attempts to locate and load the previous page in the same run. @@ -1036,51 +1418,79 @@ ** ** Page references returned by this function should be released by the ** caller using lsmFsPageRelease(). */ int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){ + int rc = LSM_OK; FileSystem *pFS = pPg->pFS; - int iPg = pPg->iPg; - - assert( eDir==1 || eDir==-1 ); - if( eDir<0 ){ - if( pRun && iPg==pRun->iFirst ){ - *ppNext = 0; - return LSM_OK; - }else if( fsIsFirst(pFS, iPg) ){ - iPg = lsmGetU32(&pPg->aData[pFS->nPagesize-4]); - }else{ - iPg--; - } - }else{ - if( pRun && iPg==pRun->iLast ){ - *ppNext = 0; - return LSM_OK; - }else if( fsIsLast(pFS, iPg) ){ - iPg = lsmGetU32(&pPg->aData[pFS->nPagesize-4]); - }else{ - iPg++; - } - } - - return fsPageGet(pFS, iPg, 0, ppNext); + Pgno iPg = pPg->iPg; + + if( pFS->pCompress ){ + int nSpace = pPg->nCompress + 2*3; + + do { + if( eDir>0 ){ + rc = fsNextPageOffset(pFS, pRun, iPg, nSpace, &iPg); + }else{ + if( iPg==pRun->iFirst ){ + iPg = 0; + }else{ + rc = fsGetPageBefore(pFS, iPg, &iPg); + } + } + + nSpace = 0; + if( iPg!=0 ){ + rc = fsPageGet(pFS, iPg, 0, ppNext, &nSpace); + assert( (*ppNext==0)==(rc!=LSM_OK || nSpace>0) ); + }else{ + *ppNext = 0; + } + }while( nSpace>0 && rc==LSM_OK ); + + }else{ + assert( eDir==1 || eDir==-1 ); + if( eDir<0 ){ + if( pRun && iPg==pRun->iFirst ){ + *ppNext = 0; + return LSM_OK; + }else if( fsIsFirst(pFS, iPg) ){ + iPg = lsmGetU32(&pPg->aData[pFS->nPagesize-4]); + }else{ + iPg--; + } + }else{ + if( pRun && iPg==pRun->iLastPg ){ + *ppNext = 0; + return LSM_OK; + }else if( fsIsLast(pFS, iPg) ){ + iPg = lsmGetU32(&pPg->aData[pFS->nPagesize-4]); + }else{ + iPg++; + } + } + rc = fsPageGet(pFS, iPg, 0, ppNext, 0); + } + + return rc; } static Pgno findAppendPoint(FileSystem *pFS){ int i; - u32 *aiAppend = pFS->pDb->pWorker->aiAppend; + Pgno *aiAppend = pFS->pDb->pWorker->aiAppend; u32 iRet = 0; for(i=LSM_APPLIST_SZ-1; iRet==0 && i>=0; i--){ if( (iRet = aiAppend[i]) ) aiAppend[i] = 0; } return iRet; } /* -** Append a page to file iFile. Return a reference to it. lsmFsPageWrite() -** has already been called on the returned reference. +** Append a page to file iFile. Set the ref-count to 1 and return a pointer +** to it. The page is writable until either lsmFsPagePersist() is called on +** it or the ref-count drops to zero. */ int lsmFsSortedAppend( FileSystem *pFS, Snapshot *pSnapshot, Segment *p, @@ -1089,56 +1499,73 @@ int rc = LSM_OK; Page *pPg = 0; *ppOut = 0; int iApp = 0; int iNext = 0; - int iPrev = p->iLast; - - if( iPrev==0 ){ - iApp = findAppendPoint(pFS); - }else if( fsIsLast(pFS, iPrev) ){ - Page *pLast = 0; - rc = fsPageGet(pFS, iPrev, 0, &pLast); - if( rc!=LSM_OK ) return rc; - iApp = lsmGetU32(&pLast->aData[pFS->nPagesize-4]); - lsmFsPageRelease(pLast); - }else{ - iApp = iPrev + 1; - } - - /* If this is the first page allocated, or if the page allocated is the - ** last in the block, allocate a new block here. */ - if( iApp==0 || fsIsLast(pFS, iApp) ){ - int iNew; /* New block number */ - - lsmBlockAllocate(pFS->pDb, &iNew); - if( iApp==0 ){ - iApp = fsFirstPageOnBlock(pFS, iNew); - }else{ - iNext = fsFirstPageOnBlock(pFS, iNew); - } - } - - /* Grab the new page. */ - pPg = 0; - rc = fsPageGet(pFS, iApp, 1, &pPg); - assert( rc==LSM_OK || pPg==0 ); - - /* If this is the first or last page of a block, fill in the pointer - ** value at the end of the new page. */ - if( rc==LSM_OK ){ - p->nSize++; - p->iLast = iApp; - if( p->iFirst==0 ) p->iFirst = iApp; - pPg->flags |= PAGE_DIRTY; - - if( fsIsLast(pFS, iApp) ){ - lsmPutU32(&pPg->aData[pFS->nPagesize-4], iNext); - }else - if( fsIsFirst(pFS, iApp) ){ - lsmPutU32(&pPg->aData[pFS->nPagesize-4], iPrev); - } + int iPrev = p->iLastPg; + + if( pFS->pCompress ){ + /* In compressed database mode the page is not assigned a page number + ** or location in the database file at this point. This will be done + ** by the lsmFsPagePersist() call. */ + rc = fsPageBuffer(pFS, 1, &pPg); + if( rc==LSM_OK ){ + pPg->pFS = pFS; + pPg->pSeg = p; + pPg->iPg = 0; + pPg->flags |= PAGE_DIRTY; + pPg->nData = pFS->nPagesize; + assert( pPg->aData ); + + pPg->nRef = 1; + pFS->nOut++; + } + }else{ + if( iPrev==0 ){ + iApp = findAppendPoint(pFS); + }else if( fsIsLast(pFS, iPrev) ){ + int iNext; + rc = fsBlockNext(pFS, fsPageToBlock(pFS, iPrev), &iNext); + if( rc!=LSM_OK ) return rc; + iApp = fsFirstPageOnBlock(pFS, iNext); + }else{ + iApp = iPrev + 1; + } + + /* If this is the first page allocated, or if the page allocated is the + ** last in the block, allocate a new block here. */ + if( iApp==0 || fsIsLast(pFS, iApp) ){ + int iNew; /* New block number */ + + lsmBlockAllocate(pFS->pDb, &iNew); + if( iApp==0 ){ + iApp = fsFirstPageOnBlock(pFS, iNew); + }else{ + iNext = fsFirstPageOnBlock(pFS, iNew); + } + } + + /* Grab the new page. */ + pPg = 0; + rc = fsPageGet(pFS, iApp, 1, &pPg, 0); + assert( rc==LSM_OK || pPg==0 ); + + /* If this is the first or last page of a block, fill in the pointer + ** value at the end of the new page. */ + if( rc==LSM_OK ){ + p->nSize++; + p->iLastPg = iApp; + if( p->iFirst==0 ) p->iFirst = iApp; + pPg->flags |= PAGE_DIRTY; + + if( fsIsLast(pFS, iApp) ){ + lsmPutU32(&pPg->aData[pFS->nPagesize-4], iNext); + }else + if( fsIsFirst(pFS, iApp) ){ + lsmPutU32(&pPg->aData[pFS->nPagesize-4], iPrev); + } + } } *ppOut = pPg; return rc; } @@ -1146,49 +1573,79 @@ /* ** Mark the sorted run passed as the second argument as finished. */ int lsmFsSortedFinish(FileSystem *pFS, Segment *p){ int rc = LSM_OK; - if( p ){ - const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); + if( p && p->iLastPg ){ + int iBlk; /* Check if the last page of this run happens to be the last of a block. ** If it is, then an extra block has already been allocated for this run. ** Shift this extra block back to the free-block list. ** ** Otherwise, add the first free page in the last block used by the run ** to the lAppend list. */ - if( (p->iLast % nPagePerBlock)==0 ){ + iBlk = fsPageToBlock(pFS, p->iLastPg); + if( fsLastPageOnPagesBlock(pFS, p->iLastPg)!=p->iLastPg ){ + int i; + Pgno *aiAppend = pFS->pDb->pWorker->aiAppend; + for(i=0; iiLastPg+1; + break; + } + } + }else if( pFS->pCompress==0 ){ Page *pLast; - rc = fsPageGet(pFS, p->iLast, 0, &pLast); + rc = fsPageGet(pFS, p->iLastPg, 0, &pLast, 0); if( rc==LSM_OK ){ int iPg = (int)lsmGetU32(&pLast->aData[pFS->nPagesize-4]); - int iBlk = fsPageToBlock(pFS, iPg); - lsmBlockRefree(pFS->pDb, iBlk); + lsmBlockRefree(pFS->pDb, fsPageToBlock(pFS, iPg)); lsmFsPageRelease(pLast); } }else{ - int i; - u32 *aiAppend = pFS->pDb->pWorker->aiAppend; - for(i=0; iiLast+1; - break; - } + int iBlk = 0; + rc = fsBlockNext(pFS, fsPageToBlock(pFS, p->iLastPg), &iBlk); + if( rc==LSM_OK ){ + lsmBlockRefree(pFS->pDb, iBlk); } } } return rc; } /* ** Obtain a reference to page number iPg. */ -int lsmFsDbPageGet(FileSystem *pFS, int iPg, Page **ppPg){ +int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){ assert( pFS ); - return fsPageGet(pFS, iPg, 0, ppPg); + return fsPageGet(pFS, iPg, 0, ppPg, 0); +} + +/* +** Obtain a reference to the last page in the segment passed as the +** second argument. +*/ +int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg){ + int rc; + Pgno iPg = pSeg->iLastPg; + if( pFS->pCompress ){ + int nSpace; + iPg++; + do { + nSpace = 0; + rc = fsGetPageBefore(pFS, iPg, &iPg); + if( rc==LSM_OK ){ + rc = fsPageGet(pFS, iPg, 0, ppPg, &nSpace); + } + }while( rc==LSM_OK && nSpace>0 ); + + }else{ + rc = fsPageGet(pFS, iPg, 0, ppPg, 0); + } + return rc; } /* ** Return a reference to meta-page iPg. If successful, LSM_OK is returned ** and *ppPg populated with the new page reference. The reference should @@ -1276,26 +1733,131 @@ u8 *lsmFsMetaPageData(MetaPage *pPg, int *pnData){ if( pnData ) *pnData = pPg->pFS->nMetasize; return pPg->aData; } -/* -** Notify the file-system that the page needs to be written back to disk -** when the reference count next drops to zero. -*/ -int lsmFsPageWrite(Page *pPg){ - pPg->flags |= PAGE_DIRTY; - return LSM_OK; -} - /* ** Return true if page is currently writable. */ int lsmFsPageWritable(Page *pPg){ return (pPg->flags & PAGE_DIRTY) ? 1 : 0; } +/* +** Append raw data to a segment. This function is only used in compressed +** database mode. +*/ +static Pgno fsAppendData( + FileSystem *pFS, /* File-system handle */ + Segment *pSeg, /* Segment to append to */ + const u8 *aData, /* Buffer containing data to write */ + int nData, /* Size of buffer aData[] in bytes */ + int *pRc /* IN/OUT: Error code */ +){ + Pgno iRet = 0; + int rc = *pRc; + assert( pFS->pCompress ); + if( rc==LSM_OK ){ + int nRem; + int nWrite; + Pgno iLastOnBlock; + Pgno iApp = pSeg->iLastPg+1; + + /* If this is the first data written into the segment, find an append-point + ** or allocate a new block. */ + if( iApp==1 ){ + pSeg->iFirst = iApp = findAppendPoint(pFS); + if( iApp==0 ){ + int iBlk; + rc = lsmBlockAllocate(pFS->pDb, &iBlk); + pSeg->iFirst = iApp = fsFirstPageOnBlock(pFS, iBlk); + } + } + iRet = iApp; + + /* Write as much data as is possible at iApp (usually all of it). */ + iLastOnBlock = fsLastPageOnPagesBlock(pFS, iApp); + if( rc==LSM_OK ){ + int nSpace = iLastOnBlock - iApp + 1; + nWrite = LSM_MIN(nData, nSpace); + nRem = nData - nWrite; + assert( nWrite>=0 ); + if( nWrite!=0 ){ + rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aData, nWrite); + } + iApp += nWrite; + } + + /* If required, allocate a new block and write the rest of the data + ** into it. Set the next and previous block pointers to link the new + ** block to the old. */ + assert( nRem<=0 || (iApp-1)==iLastOnBlock ); + if( rc==LSM_OK && (iApp-1)==iLastOnBlock ){ + u8 aPtr[4]; /* Space to serialize a u32 */ + int iBlk; /* New block number */ + + if( nWrite>0 ){ + /* Allocate a new block. */ + rc = lsmBlockAllocate(pFS->pDb, &iBlk); + + /* Set the "next" pointer on the old block */ + if( rc==LSM_OK ){ + assert( iApp==(fsPageToBlock(pFS, iApp)*pFS->nBlocksize)-4 ); + lsmPutU32(aPtr, iBlk); + rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aPtr, sizeof(aPtr)); + } + + /* Set the "prev" pointer on the new block */ + if( rc==LSM_OK ){ + Pgno iWrite; + lsmPutU32(aPtr, fsPageToBlock(pFS, iApp)); + iWrite = fsFirstPageOnBlock(pFS, iBlk); + rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iWrite-4, aPtr, sizeof(aPtr)); + if( nRem>0 ) iApp = iWrite; + } + }else{ + /* The next block is already allocated. */ + assert( nRem>0 ); + rc = fsBlockNext(pFS, fsPageToBlock(pFS, iApp), &iBlk); + iApp = fsFirstPageOnBlock(pFS, iBlk); + } + + /* Write the remaining data into the new block */ + if( rc==LSM_OK && nRem>0 ){ + rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, &aData[nWrite], nRem); + iApp += nRem; + } + } + + pSeg->iLastPg = iApp-1; + *pRc = rc; + } + + return iRet; +} + +/* +** This function is only called in compressed database mode. It +** compresses the contents of page pPg and writes the result to the +** buffer at pFS->aBuffer. The size of the compressed data is stored in +** pPg->nCompress. +** +** If buffer pFS->aBuffer[] has not been allocated then this function +** allocates it. If this fails, LSM_NOMEM is returned. Otherwise, LSM_OK. +*/ +static int fsCompressIntoBuffer(FileSystem *pFS, Page *pPg){ + lsm_compress *p = pFS->pCompress; + + if( fsAllocateBuffer(pFS) ) return LSM_NOMEM; + assert( pPg->nData==pFS->nPagesize ); + + pPg->nCompress = pFS->nBuffer; + return p->xCompress(p->pCtx, + (char *)pFS->aBuffer, &pPg->nCompress, + (const char *)pPg->aData, pPg->nData + ); +} /* ** If the page passed as an argument is dirty, update the database file ** (or mapping of the database file) with its current contents and mark ** the page as clean. @@ -1305,32 +1867,113 @@ */ int lsmFsPagePersist(Page *pPg){ int rc = LSM_OK; if( pPg && (pPg->flags & PAGE_DIRTY) ){ FileSystem *pFS = pPg->pFS; - i64 iOff; /* Offset to write within database file */ - - iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1); - if( pFS->bUseMmap==0 ){ - rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, pPg->aData,pFS->nPagesize); - }else if( pPg->flags & PAGE_FREE ){ - fsGrowMapping(pFS, iOff + pFS->nPagesize, &rc); - if( rc==LSM_OK ){ - u8 *aTo = &((u8 *)(pFS->pMap))[iOff]; - memcpy(aTo, pPg->aData, pFS->nPagesize); - lsmFree(pFS->pEnv, pPg->aData); - pPg->aData = aTo; - pPg->flags &= ~PAGE_FREE; - fsPageAddToLru(pFS, pPg); + + if( pFS->pCompress ){ + int iHash; /* Hash key of assigned page number */ + u8 aSz[3]; /* pPg->nCompress as a 24-bit big-endian */ + assert( pPg->pSeg && pPg->iPg==0 && pPg->nCompress==0 ); + + /* Compress the page image. */ + rc = fsCompressIntoBuffer(pFS, pPg); + + /* Serialize the compressed size into buffer aSz[] */ + putRecordSize(aSz, pPg->nCompress, 0); + + /* Write the serialized page record into the database file. */ + pPg->iPg = fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc); + fsAppendData(pFS, pPg->pSeg, pFS->aBuffer, pPg->nCompress, &rc); + fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc); + + /* Now that it has a page number, insert the page into the hash table */ + iHash = fsHashKey(pFS->nHash, pPg->iPg); + pPg->pHashNext = pFS->apHash[iHash]; + pFS->apHash[iHash] = pPg; + + pPg->pSeg->nSize += (sizeof(aSz) * 2) + pPg->nCompress; + + }else{ + i64 iOff; /* Offset to write within database file */ + iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1); + if( pFS->bUseMmap==0 ){ + rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, pPg->aData,pFS->nPagesize); + }else if( pPg->flags & PAGE_FREE ){ + fsGrowMapping(pFS, iOff + pFS->nPagesize, &rc); + if( rc==LSM_OK ){ + u8 *aTo = &((u8 *)(pFS->pMap))[iOff]; + memcpy(aTo, pPg->aData, pFS->nPagesize); + lsmFree(pFS->pEnv, pPg->aData); + pPg->aData = aTo; + pPg->flags &= ~PAGE_FREE; + fsPageAddToLru(pFS, pPg); + } } } pPg->flags &= ~PAGE_DIRTY; pFS->nWrite++; } return rc; } + +/* +** For non-compressed databases, this function is a no-op. For compressed +** databases, it adds a padding record to the segment passed as the third +** argument. +** +** The size of the padding records is selected so that the last byte +** written is the last byte of a disk sector. This means that if a +** snapshot is taken and checkpointed, subsequent worker processes will +** not write to any sector that contains checkpointed data. +*/ +int lsmFsSortedPadding( + FileSystem *pFS, + Snapshot *pSnapshot, + Segment *pSeg +){ + int rc = LSM_OK; + if( pFS->pCompress ){ + Pgno iLast2; + Pgno iLast = pSeg->iLastPg; /* Current last page of segment */ + int nPad; /* Bytes of padding required */ + u8 aSz[3]; + + iLast2 = (1 + iLast/pFS->szSector) * pFS->szSector - 1; + assert( fsPageToBlock(pFS, iLast)==fsPageToBlock(pFS, iLast2) ); + nPad = iLast2 - iLast; + + if( iLast2>fsLastPageOnPagesBlock(pFS, iLast) ){ + nPad -= 4; + } + assert( nPad>=0 ); + + if( nPad>=6 ){ + pSeg->nSize += nPad; + nPad -= 6; + putRecordSize(aSz, nPad, 1); + fsAppendData(pFS, pSeg, aSz, sizeof(aSz), &rc); + memset(pFS->aBuffer, 0, nPad); + fsAppendData(pFS, pSeg, pFS->aBuffer, nPad, &rc); + fsAppendData(pFS, pSeg, aSz, sizeof(aSz), &rc); + }else if( nPad>0 ){ + u8 aBuf[5] = {0,0,0,0,0}; + aBuf[0] = (u8)nPad; + aBuf[nPad-1] = (u8)nPad; + fsAppendData(pFS, pSeg, aBuf, nPad, &rc); + } + + assert( rc!=LSM_OK + || pSeg->iLastPg==fsLastPageOnPagesBlock(pFS, pSeg->iLastPg) + || ((pSeg->iLastPg + 1) % pFS->szSector)==0 + ); + } + + return rc; +} + /* ** Increment the reference count on the page object passed as the first ** argument. */ @@ -1464,20 +2107,20 @@ LsmString str; int iBlk; int iLastBlk; iBlk = fsPageToBlock(pFS, pArray->iFirst); - iLastBlk = fsPageToBlock(pFS, pArray->iLast); + iLastBlk = fsPageToBlock(pFS, pArray->iLastPg); lsmStringInit(&str, pDb->pEnv); lsmStringAppendf(&str, "%d", pArray->iFirst); while( iBlk!=iLastBlk ){ lsmStringAppendf(&str, " %d", fsLastPageOnBlock(pFS, iBlk)); fsBlockNext(pFS, iBlk, &iBlk); lsmStringAppendf(&str, " %d", fsFirstPageOnBlock(pFS, iBlk)); } - lsmStringAppendf(&str, " %d", pArray->iLast); + lsmStringAppendf(&str, " %d", pArray->iLastPg); *pzOut = str.z; } if( bUnlock ){ @@ -1497,16 +2140,15 @@ int nUsed, u8 *aUsed ){ if( pSeg ){ if( pSeg && pSeg->nSize>0 ){ - const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); - + Pgno iLast = pSeg->iLastPg; int iBlk; int iLastBlk; iBlk = fsPageToBlock(pFS, pSeg->iFirst); - iLastBlk = fsPageToBlock(pFS, pSeg->iLast); + iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg); while( iBlk ){ assert( iBlk<=nUsed ); /* assert( aUsed[iBlk-1]==0 ); */ aUsed[iBlk-1] = 1; @@ -1515,11 +2157,11 @@ }else{ iBlk = 0; } } - if( bExtra && (pSeg->iLast % nPagePerBlock)==0 ){ + if( bExtra && iLast==fsLastPageOnPagesBlock(pFS, iLast) ){ fsBlockNext(pFS, iLastBlk, &iBlk); aUsed[iBlk-1] = 1; } } } @@ -1537,25 +2179,25 @@ ** ** If no errors are found, non-zero is returned. If an error is found, an ** assert() fails. */ int lsmFsIntegrityCheck(lsm_db *pDb){ + FileSystem *pFS = pDb->pFS; int i; int j; Freelist freelist = {0, 0, 0}; - FileSystem *pFS = pDb->pFS; u8 *aUsed; Level *pLevel; Snapshot *pWorker = pDb->pWorker; int nBlock = pWorker->nBlock; aUsed = lsmMallocZero(pDb->pEnv, nBlock); if( aUsed==0 ){ /* Malloc has failed. Since this function is only called within debug - ** builds, this probably means the user is running an OOM injection test. - ** Regardless, it will not be possible to run the integrity-check at this - ** time, so assume the database is Ok and return non-zero. */ + ** builds, this probably means the user is running an OOM injection test. + ** Regardless, it will not be possible to run the integrity-check at this + ** time, so assume the database is Ok and return non-zero. */ return 1; } for(pLevel=pWorker->pLevel; pLevel; pLevel=pLevel->pNext){ int i; @@ -1586,7 +2228,24 @@ for(i=0; ipEnv, aUsed); lsmFree(pDb->pEnv, freelist.aEntry); + return 1; } + +#ifndef NDEBUG +/* +** Return true if pPg happens to be the last page in segment pSeg. Or false +** otherwise. This function is only invoked as part of assert() conditions. +*/ +int lsmFsDbPageIsLast(Segment *pSeg, Page *pPg){ + if( pPg->pFS->pCompress ){ + Pgno iNext = 0; + int rc; + rc = fsNextPageOffset(pPg->pFS, pSeg, pPg->iPg, pPg->nCompress+6, &iNext); + return (rc!=LSM_OK || iNext==0); + } + return (pPg->iPg==pSeg->iLastPg); +} +#endif Index: src/lsm_main.c ================================================================== --- src/lsm_main.c +++ src/lsm_main.c @@ -321,10 +321,27 @@ }else{ pDb->bMultiProc = *piVal = (*piVal!=0); } break; } + + case LSM_CONFIG_SET_COMPRESSION: { + int *p = va_arg(ap, lsm_compress *); + if( pDb->pDatabase ){ + /* If lsm_open() has been called, this call is against the rules. */ + rc = LSM_MISUSE_BKPT; + }else{ + memcpy(&pDb->compress, p, sizeof(lsm_compress)); + } + break; + } + + case LSM_CONFIG_GET_COMPRESSION: { + int *p = va_arg(ap, lsm_compress *); + memcpy(p, &pDb->compress, sizeof(lsm_compress)); + break; + } default: rc = LSM_MISUSE; break; } @@ -333,11 +350,11 @@ return rc; } void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){ lsmStringAppendf(pStr, "%s{%d %d %d %d}", zPre, - pSeg->iFirst, pSeg->iLast, pSeg->iRoot, pSeg->nSize + pSeg->iFirst, pSeg->iLastPg, pSeg->iRoot, pSeg->nSize ); } static int infoGetWorker(lsm_db *pDb, Snapshot **pp, int *pbUnlock){ int rc = LSM_OK; Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -27,11 +27,11 @@ ** the page and continuing backwards towards the start). All values are ** stored as unsigned big-endian integers. ** ** * Number of records on page (2 bytes). ** * Flags field (2 bytes). -** * Left-hand pointer value (4 bytes). +** * Left-hand pointer value (8 bytes). ** * The starting offset of each record (2 bytes per record). ** ** Records may span pages. Unless it happens to be an exact fit, the part ** of the final record that starts on page X that does not fit on page X ** is stored at the start of page (X+1). This means there may be pages where @@ -85,12 +85,12 @@ /* ** The following macros are used to access a page footer. */ #define SEGMENT_NRECORD_OFFSET(pgsz) ((pgsz) - 2) #define SEGMENT_FLAGS_OFFSET(pgsz) ((pgsz) - 2 - 2) -#define SEGMENT_POINTER_OFFSET(pgsz) ((pgsz) - 2 - 2 - 4) -#define SEGMENT_CELLPTR_OFFSET(pgsz, iCell) ((pgsz) - 2 - 2 - 4 - 2 - (iCell)*2) +#define SEGMENT_POINTER_OFFSET(pgsz) ((pgsz) - 2 - 2 - 8) +#define SEGMENT_CELLPTR_OFFSET(pgsz, iCell) ((pgsz) - 2 - 2 - 8 - 2 - (iCell)*2) #define SEGMENT_EOF(pgsz, nEntry) SEGMENT_CELLPTR_OFFSET(pgsz, nEntry) #define SEGMENT_BTREE_FLAG 0x0001 #define PGFTR_SKIP_NEXT_FLAG 0x0002 @@ -120,16 +120,16 @@ /* Current page. See segmentPtrLoadPage(). */ Page *pPg; /* Current page */ u16 flags; /* Copy of page flags field */ int nCell; /* Number of cells on pPg */ - int iPtr; /* Base cascade pointer */ + Pgno iPtr; /* Base cascade pointer */ /* Current cell. See segmentPtrLoadCell() */ int iCell; /* Current record within page pPg */ int eType; /* Type of current record */ - int iPgPtr; /* Cascade pointer offset */ + Pgno iPgPtr; /* Cascade pointer offset */ void *pKey; int nKey; /* Key associated with current record */ void *pVal; int nVal; /* Current record value (eType==WRITE only) */ /* Blobs used to allocate buffers for pKey and pVal as required */ Blob blob1; @@ -265,27 +265,46 @@ struct Hierarchy { Page **apHier; int nHier; }; +/* +** aSave: +** When mergeWorkerNextPage() is called to advance to the next page in +** the output segment, if the bStore flag for an element of aSave[] is +** true, it is cleared and the corresponding iPgno value is set to the +** page number of the page just completed. +** +** aSave[0] is used to record the pointer value to be pushed into the +** b-tree hierarchy. aSave[1] is used to save the page number of the +** page containing the indirect key most recently written to the b-tree. +** see mergeWorkerPushHierarchy() for details. +*/ struct MergeWorker { lsm_db *pDb; /* Database handle */ Level *pLevel; /* Worker snapshot Level being merged */ MultiCursor *pCsr; /* Cursor to read new segment contents from */ int bFlush; /* True if this is an in-memory tree flush */ Hierarchy hier; /* B-tree hierarchy under construction */ Page *pPage; /* Current output page */ int nWork; /* Number of calls to mergeWorkerNextPage() */ Pgno *aGobble; /* Gobble point for each input segment */ + + Pgno iIndirect; + struct SavedPgno { + Pgno iPgno; + int bStore; + } aSave[2]; }; #ifdef LSM_DEBUG_EXPENSIVE static int assertPointersOk(lsm_db *, Segment *, Segment *, int); static int assertBtreeOk(lsm_db *, Segment *); static void assertRunInOrder(lsm_db *pDb, Segment *pSeg); #else #define assertRunInOrder(x,y) +#define assertBtreeOk(x,y) #endif struct FilePage { u8 *aData; int nData; }; static u8 *fsPageData(Page *pPg, int *pnData){ @@ -319,10 +338,32 @@ return ((u32)aOut[0] << 24) + ((u32)aOut[1] << 16) + ((u32)aOut[2] << 8) + ((u32)aOut[3]); } + +u32 lsmGetU64(u8 *aOut){ + return ((u64)aOut[0] << 56) + + ((u64)aOut[1] << 48) + + ((u64)aOut[2] << 40) + + ((u64)aOut[3] << 32) + + ((u64)aOut[4] << 24) + + ((u32)aOut[5] << 16) + + ((u32)aOut[6] << 8) + + ((u32)aOut[7]); +} + +void lsmPutU64(u8 *aOut, u64 nVal){ + aOut[0] = (u8)((nVal>>56) & 0xFF); + aOut[1] = (u8)((nVal>>48) & 0xFF); + aOut[2] = (u8)((nVal>>40) & 0xFF); + aOut[3] = (u8)((nVal>>32) & 0xFF); + aOut[4] = (u8)((nVal>>24) & 0xFF); + aOut[5] = (u8)((nVal>>16) & 0xFF); + aOut[6] = (u8)((nVal>> 8) & 0xFF); + aOut[7] = (u8)((nVal ) & 0xFF); +} static int sortedBlobGrow(lsm_env *pEnv, Blob *pBlob, int nData){ assert( pBlob->pEnv==pEnv || (pBlob->pEnv==0 && pBlob->pData==0) ); if( pBlob->nAllocpData = lsmReallocOrFree(pEnv, pBlob->pData, nData); @@ -429,12 +470,12 @@ static int pageGetNRec(u8 *aData, int nData){ return (int)lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]); } -static int pageGetPtr(u8 *aData, int nData){ - return (int)lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]); +static Pgno pageGetPtr(u8 *aData, int nData){ + return (Pgno)lsmGetU64(&aData[SEGMENT_POINTER_OFFSET(nData)]); } static int pageGetFlags(u8 *aData, int nData){ return (int)lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]); } @@ -454,17 +495,17 @@ /* ** Return the decoded (possibly relative) pointer value stored in cell ** iCell from page aData/nData. */ -static int pageGetRecordPtr(u8 *aData, int nData, int iCell){ - int iRet; /* Return value */ +static Pgno pageGetRecordPtr(u8 *aData, int nData, int iCell){ + Pgno iRet; /* Return value */ u8 *aCell; /* Pointer to cell iCell */ assert( iCell=0 ); aCell = pageGetCell(aData, nData, iCell); - lsmVarintGet32(&aCell[1], &iRet); + lsmVarintGet64(&aCell[1], &iRet); return iRet; } static u8 *pageGetKey( Page *pPg, /* Page to read from */ @@ -525,22 +566,23 @@ aData = fsPageData(pPg, &nData); aCell = pageGetCell(aData, nData, iKey); assert( aCell[0]==0 ); aCell++; - aCell += lsmVarintGet32(aCell, &iRef); - lsmVarintGet32(aCell, &iRef); + aCell += lsmVarintGet64(aCell, &iRef); + lsmVarintGet64(aCell, &iRef); assert( iRef>0 ); return iRef; } +#define GETVARINT64(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet64((a), &(i))) #define GETVARINT32(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet32((a), &(i))) static int pageGetBtreeKey( Page *pPg, int iKey, - int *piPtr, + Pgno *piPtr, int *piTopic, void **ppKey, int *pnKey, Blob *pBlob ){ @@ -553,17 +595,17 @@ assert( SEGMENT_BTREE_FLAG & pageGetFlags(aData, nData) ); assert( iKey>=0 && iKeypData; @@ -582,11 +624,11 @@ if( pCsr->iPg<0 ){ pCsr->pKey = 0; pCsr->nKey = 0; pCsr->eType = 0; }else{ - int dummy; + Pgno dummy; int iPg = pCsr->iPg; int iCell = pCsr->aPg[iPg].iCell; while( iCell<0 && (--iPg)>=0 ){ iCell = pCsr->aPg[iPg].iCell-1; } @@ -811,11 +853,10 @@ if( rc==LSM_OK && nDepth>1 ){ Blob blob = {0,0,0}; void *pSeek; int nSeek; int iTopicSeek; - int dummy; int iPg = 0; int iLoad = pCsr->pSeg->iRoot; Page *pPg = pCsr->aPg[nDepth-1].pPage; if( pageObjGetNRec(pPg)==0 ){ @@ -825,10 +866,11 @@ assert( iCell==-1 ); iTopicSeek = 1000; pSeek = 0; nSeek = 0; }else{ + Pgno dummy; rc = pageGetBtreeKey(pPg, 0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob ); } @@ -853,11 +895,11 @@ while( iMax>=iMin ){ int iTry = (iMin+iMax)/2; void *pKey; int nKey; /* Key for cell iTry */ int iTopic; /* Topic for key pKeyT/nKeyT */ - int iPtr; /* Pointer for cell iTry */ + Pgno iPtr; /* Pointer for cell iTry */ int res; /* (pSeek - pKeyT) */ rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopic, &pKey, &nKey,&blob); if( rc!=LSM_OK ) break; @@ -892,11 +934,11 @@ pBtreePg = &pCsr->aPg[pCsr->iPg]; aData = fsPageData(pBtreePg->pPage, &nData); pCsr->iPtr = btreeCursorPtr(aData, nData, pBtreePg->iCell+1); if( pBtreePg->iCell<0 ){ - int dummy; + Pgno dummy; int i; for(i=pCsr->iPg-1; i>=0; i--){ if( pCsr->aPg[i].iCell>0 ) break; } assert( i>=0 ); @@ -1005,11 +1047,11 @@ pPtr->iCell = iNew; aData = fsPageData(pPtr->pPg, &nPgsz); iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nPgsz, pPtr->iCell)]); pPtr->eType = aData[iOff]; iOff++; - iOff += GETVARINT32(&aData[iOff], pPtr->iPgPtr); + iOff += GETVARINT64(&aData[iOff], pPtr->iPgPtr); iOff += GETVARINT32(&aData[iOff], pPtr->nKey); if( rtIsWrite(pPtr->eType) ){ iOff += GETVARINT32(&aData[iOff], pPtr->nVal); } assert( pPtr->nKey>=0 ); @@ -1047,11 +1089,11 @@ aData = lsmFsPageData(pPg, &nData); if( pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG ){ void *pKey; int nKey; - int dummy; + Pgno dummy; rc = pageGetBtreeKey( pPg, pMerge->splitkey.iCell, &dummy, &iTopic, &pKey, &nKey, &blob ); if( rc==LSM_OK && blob.pData!=pKey ){ rc = sortedBlobSet(pEnv, &blob, pKey, nKey); @@ -1152,12 +1194,15 @@ int bLast, int *pRc ){ if( *pRc==LSM_OK ){ Page *pNew = 0; - Pgno iPg = (bLast ? pPtr->pSeg->iLast : pPtr->pSeg->iFirst); - *pRc = lsmFsDbPageGet(pFS, iPg, &pNew); + if( bLast ){ + *pRc = lsmFsDbPageLast(pFS, pPtr->pSeg, &pNew); + }else{ + *pRc = lsmFsDbPageGet(pFS, pPtr->pSeg->iFirst, &pNew); + } segmentPtrSetPage(pPtr, pNew); } } @@ -1532,11 +1577,11 @@ int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp; int res; /* Result of comparison operation */ int rc = LSM_OK; int iMin; int iMax; - int iPtrOut = 0; + Pgno iPtrOut = 0; const int iTopic = 0; /* If the current page contains an oversized entry, then there are no ** pointers to one or more of the subsequent pages in the sorted run. ** The following call ensures that the segment-ptr points to the correct @@ -1556,11 +1601,11 @@ assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) ); #endif assert( pPtr->nCell>0 || pPtr->pSeg->nSize==1 - || lsmFsPageNumber(pPtr->pPg)==pPtr->pSeg->iLast + || lsmFsDbPageIsLast(pPtr->pSeg, pPtr->pPg) ); if( pPtr->nCell==0 ){ segmentPtrReset(pPtr); }else{ iMin = 0; @@ -1700,11 +1745,11 @@ iMax = nRec-1; while( iMax>=iMin ){ int iTry = (iMin+iMax)/2; void *pKeyT; int nKeyT; /* Key for cell iTry */ int iTopicT; /* Topic for key pKeyT/nKeyT */ - int iPtr; /* Pointer associated with cell iTry */ + Pgno iPtr; /* Pointer associated with cell iTry */ int res; /* (pKey - pKeyT) */ rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob); if( rc!=LSM_OK ) break; if( piFirst && pKeyT==blob.pData ){ @@ -2126,19 +2171,21 @@ int nPtr = 0; int iPtr = 0; int rc = LSM_OK; for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){ + if( pLvl->iAge<0 ) continue; nPtr += (1 + pLvl->nRight); } assert( pCsr->aPtr==0 ); pCsr->aPtr = lsmMallocZeroRc(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nPtr, &rc); if( rc==LSM_OK ) pCsr->nPtr = nPtr; for(pLvl=pSnap->pLevel; pLvl && rc==LSM_OK; pLvl=pLvl->pNext){ int i; + if( pLvl->iAge<0 ) continue; pCsr->aPtr[iPtr].pLevel = pLvl; pCsr->aPtr[iPtr].pSeg = &pLvl->lhs; iPtr++; for(i=0; inRight; i++){ pCsr->aPtr[iPtr].pLevel = pLvl; @@ -2861,15 +2908,13 @@ ){ Segment *pSeg; /* Segment being written */ lsm_db *pDb = pMW->pDb; /* Database handle */ int rc = LSM_OK; /* Return code */ int i; - int iRight = 0; Page **apHier = pMW->hier.apHier; int nHier = pMW->hier.nHier; - assert( nHier>0 && pMW->pLevel->pMerge->bHierReadonly ); pSeg = &pMW->pLevel->lhs; for(i=0; rc==LSM_OK && ipFS, pDb->pWorker, pSeg, &pNew); @@ -2892,18 +2937,16 @@ int nEntry = pageGetNRec(a2, n2); int iEof1 = SEGMENT_EOF(n1, nEntry); int iEof2 = SEGMENT_EOF(n2, nEntry); memcpy(a1, a2, iEof2); memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2); - if( iRight ) lsmPutU32(&a1[SEGMENT_POINTER_OFFSET(n1)], iRight); lsmFsPageRelease(apHier[i]); apHier[i] = pNew; - iRight = lsmFsPageNumber(pNew); }else{ lsmPutU16(&a1[SEGMENT_FLAGS_OFFSET(n1)], SEGMENT_BTREE_FLAG); lsmPutU16(&a1[SEGMENT_NRECORD_OFFSET(n1)], 0); - lsmPutU32(&a1[SEGMENT_POINTER_OFFSET(n1)], 0); + lsmPutU64(&a1[SEGMENT_POINTER_OFFSET(n1)], 0); i = i - 1; lsmFsPageRelease(pNew); } } } @@ -2912,13 +2955,10 @@ if( rc==LSM_OK ){ for(i=0; ipLevel->pMerge->bHierReadonly = 0; - } return rc; } /* ** Allocate and populate the MergeWorker.apHier[] array. @@ -2930,11 +2970,10 @@ pSeg = &pMW->pLevel->lhs; p = &pMW->hier; if( p->apHier==0 && pSeg->iRoot!=0 ){ - int bHierReadonly = pMW->pLevel->pMerge->bHierReadonly; FileSystem *pFS = pMW->pDb->pFS; lsm_env *pEnv = pMW->pDb->pEnv; Page **apHier = 0; int nHier = 0; int iPg = pSeg->iRoot; @@ -2956,11 +2995,10 @@ ); if( apNew==0 ){ rc = LSM_NOMEM_BKPT; break; } - if( bHierReadonly==0 ) lsmFsPageWrite(pPg); apHier = apNew; memmove(&apHier[1], &apHier[0], sizeof(Page *) * nHier); nHier++; apHier[0] = pPg; @@ -2970,12 +3008,17 @@ break; } }while( 1 ); if( rc==LSM_OK ){ + u8 *aData; + int nData; + aData = fsPageData(apHier[0], &nData); + pMW->aSave[0].iPgno = pageGetPtr(aData, nData); p->nHier = nHier; p->apHier = apHier; + rc = mergeWorkerMoveHierarchy(pMW, 0); }else{ int i; for(i=0; ipLevel->lhs; + Hierarchy *p = &pMW->hier; lsm_db *pDb = pMW->pDb; /* Database handle */ - int rc; /* Return Code */ + int rc = LSM_OK; /* Return Code */ int iLevel; /* Level of b-tree hierachy to write to */ int nData; /* Size of aData[] in bytes */ u8 *aData; /* Page data for level iLevel */ int iOff; /* Offset on b-tree page to write record to */ int nRec; /* Initial number of records on b-tree page */ - Pgno iPtr; /* Pointer value to accompany pKey/nKey */ - int bIndirect; /* True to use an indirect record */ - - Hierarchy *p; - Segment *pSeg; - - /* If there exists a b-tree hierarchy and it is not loaded into - ** memory, load it now. */ - pSeg = &pMW->pLevel->lhs; - p = &pMW->hier; - rc = mergeWorkerLoadHierarchy(pMW); - - /* Obtain the absolute pointer value to store along with the key in the - ** page body. This pointer points to a page that contains keys that are - ** smaller than pKey/nKey. */ - if( p->nHier ){ - aData = fsPageData(p->apHier[0], &nData); - iPtr = lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]); - }else{ - iPtr = pSeg->iFirst; - } - - if( p->nHier && pMW->pLevel->pMerge->bHierReadonly ){ - rc = mergeWorkerMoveHierarchy(pMW, bSep); - if( rc!=LSM_OK ) goto push_hierarchy_out; - } - - /* Determine if the indirect format should be used. */ - bIndirect = (nKey*4 > lsmFsPageSize(pMW->pDb->pFS)); + + /* iKeyPg should be zero for an ordinary b-tree key, or non-zero for an + ** indirect key. The flags byte for an indirect key is 0x00. */ + assert( (eType==0)==(iKeyPg!=0) ); /* The MergeWorker.apHier[] array contains the right-most leaf of the b-tree ** hierarchy, the root node, and all nodes that lie on the path between. ** apHier[0] is the right-most leaf and apHier[pMW->nHier-1] is the current ** root page. ** ** This loop searches for a node with enough space to store the key on, ** starting with the leaf and iterating up towards the root. When the loop - ** exits, the key may be written to apHier[iLevel]. - */ + ** exits, the key may be written to apHier[iLevel]. */ for(iLevel=0; iLevel<=p->nHier; iLevel++){ int nByte; /* Number of free bytes required */ - int iRight; /* Right hand pointer from aData[]/nData */ if( iLevel==p->nHier ){ /* Extend the array and allocate a new root page. */ Page **aNew; aNew = (Page **)lsmRealloc( pMW->pDb->pEnv, p->apHier, sizeof(Page *)*(p->nHier+1) ); if( !aNew ){ - rc = LSM_NOMEM_BKPT; - goto push_hierarchy_out; + return LSM_NOMEM_BKPT; } p->apHier = aNew; }else{ + Page *pOld; int nFree; - /* If the key will fit on this page, break out of the loop. */ - assert( lsmFsPageWritable(p->apHier[iLevel]) ); - aData = fsPageData(p->apHier[iLevel], &nData); - iRight = lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]); - if( bIndirect ){ - nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(iKeyPg); + /* If the key will fit on this page, break out of the loop here. + ** The new entry will be written to page apHier[iLevel]. */ + pOld = p->apHier[iLevel]; + assert( lsmFsPageWritable(pOld) ); + aData = fsPageData(pOld, &nData); + if( eType==0 ){ + nByte = 2 + 1 + lsmVarintLen32(iPtr) + lsmVarintLen32(iKeyPg); }else{ - nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(nKey) + nKey; + nByte = 2 + 1 + lsmVarintLen32(iPtr) + lsmVarintLen32(nKey) + nKey; } nRec = pageGetNRec(aData, nData); nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData); if( nByte<=nFree ) break; - /* Otherwise, it is full. Release it. */ - iPtr = lsmFsPageNumber(p->apHier[iLevel]); - rc = lsmFsPageRelease(p->apHier[iLevel]); + /* Otherwise, this page is full. Set the right-hand-child pointer + ** to iPtr and release it. */ + lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr); + rc = lsmFsPagePersist(pOld); + if( rc==LSM_OK ){ + iPtr = lsmFsPageNumber(pOld); + lsmFsPageRelease(pOld); + } } /* Allocate a new page for apHier[iLevel]. */ p->apHier[iLevel] = 0; if( rc==LSM_OK ){ rc = lsmFsSortedAppend( pDb->pFS, pDb->pWorker, pSeg, &p->apHier[iLevel] ); } - if( rc!=LSM_OK ) goto push_hierarchy_out; + if( rc!=LSM_OK ) return rc; aData = fsPageData(p->apHier[iLevel], &nData); memset(aData, 0, nData); lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG); lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0); - if( iLevel>0 ){ - iRight = lsmFsPageNumber(p->apHier[iLevel-1]); - lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight); - } if( iLevel==p->nHier ){ p->nHier++; break; } } /* Write the key into page apHier[iLevel]. */ aData = fsPageData(p->apHier[iLevel], &nData); - iOff = mergeWorkerPageOffset(aData, nData); - nRec = pageGetNRec(aData, nData); lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], iOff); lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], nRec+1); - - if( bIndirect ){ + if( eType==0 ){ aData[iOff++] = 0x00; iOff += lsmVarintPut32(&aData[iOff], iPtr); iOff += lsmVarintPut32(&aData[iOff], iKeyPg); }else{ - aData[iOff++] = (u8)(iTopic | LSM_SEPARATOR); + aData[iOff++] = eType; iOff += lsmVarintPut32(&aData[iOff], iPtr); iOff += lsmVarintPut32(&aData[iOff], nKey); memcpy(&aData[iOff], pKey, nKey); } - if( iLevel>0 ){ - int iRight = lsmFsPageNumber(p->apHier[iLevel-1]); - lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight); - } - - /* Write the right-hand pointer of the right-most leaf page of the - ** b-tree heirarchy. */ - aData = fsPageData(p->apHier[0], &nData); - lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iKeyPg); + return rc; +} + +static int mergeWorkerBtreeIndirect(MergeWorker *pMW){ + int rc = LSM_OK; + if( pMW->iIndirect ){ + Pgno iKeyPg = pMW->aSave[1].iPgno; + rc = mergeWorkerBtreeWrite(pMW, 0, pMW->iIndirect, iKeyPg, 0, 0); + pMW->iIndirect = 0; + } + return rc; +} + +/* +** Append the database key (iTopic/pKey/nKey) to the b-tree under +** construction. This key has not yet been written to a segment page. +** The pointer that will accompany the new key in the b-tree - that +** points to the completed segment page that contains keys smaller than +** (pKey/nKey) is currently stored in pMW->aSave[0].iPgno. +*/ +static int mergeWorkerPushHierarchy( + MergeWorker *pMW, /* Merge worker object */ + int iTopic, /* Topic value for this key */ + void *pKey, /* Pointer to key buffer */ + int nKey /* Size of pKey buffer in bytes */ +){ + lsm_db *pDb = pMW->pDb; /* Database handle */ + int rc = LSM_OK; /* Return Code */ + int iLevel; /* Level of b-tree hierachy to write to */ + int nData; /* Size of aData[] in bytes */ + u8 *aData; /* Page data for level iLevel */ + int iOff; /* Offset on b-tree page to write record to */ + int nRec; /* Initial number of records on b-tree page */ + Pgno iPtr; /* Pointer value to accompany pKey/nKey */ + + Hierarchy *p; + Segment *pSeg; + + /* If there exists a b-tree hierarchy and it is not loaded into + ** memory, load it now. */ + pSeg = &pMW->pLevel->lhs; + p = &pMW->hier; + + assert( pMW->aSave[0].bStore==0 ); + assert( pMW->aSave[1].bStore==0 ); + rc = mergeWorkerBtreeIndirect(pMW); + + /* Obtain the absolute pointer value to store along with the key in the + ** page body. This pointer points to a page that contains keys that are + ** smaller than pKey/nKey. */ + iPtr = pMW->aSave[0].iPgno; + assert( iPtr!=0 ); + + /* Determine if the indirect format should be used. */ + if( (nKey*4 > lsmFsPageSize(pMW->pDb->pFS)) ){ + pMW->iIndirect = iPtr; + pMW->aSave[1].bStore = 1; + }else{ + rc = mergeWorkerBtreeWrite( + pMW, (u8)(iTopic | LSM_SEPARATOR), iPtr, 0, pKey, nKey + ); + } /* Ensure that the SortedRun.iRoot field is correct. */ - pSeg->iRoot = lsmFsPageNumber(p->apHier[p->nHier-1]); + return rc; +} -push_hierarchy_out: +static int mergeWorkerFinishHierarchy( + MergeWorker *pMW /* Merge worker object */ +){ + int i; /* Used to loop through apHier[] */ + int rc = LSM_OK; /* Return code */ + Pgno iPtr; /* New right-hand-child pointer value */ + + iPtr = pMW->aSave[0].iPgno; + for(i=0; ihier.nHier && rc==LSM_OK; i++){ + Page *pPg = pMW->hier.apHier[i]; + int nData; /* Size of aData[] in bytes */ + u8 *aData; /* Page data for pPg */ + + aData = fsPageData(pPg, &nData); + lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr); + + rc = lsmFsPagePersist(pPg); + iPtr = lsmFsPageNumber(pPg); + lsmFsPageRelease(pPg); + } + + if( pMW->hier.nHier ){ + pMW->pLevel->lhs.iRoot = iPtr; + lsmFree(pMW->pDb->pEnv, pMW->hier.apHier); + pMW->hier.apHier = 0; + pMW->hier.nHier = 0; + } + return rc; } + +static int mergeWorkerAddPadding( + MergeWorker *pMW /* Merge worker object */ +){ + FileSystem *pFS = pMW->pDb->pFS; + return lsmFsSortedPadding(pFS, pMW->pDb->pWorker, &pMW->pLevel->lhs); +} static int keyszToSkip(FileSystem *pFS, int nKey){ int nPgsz; /* Nominal database page size */ nPgsz = lsmFsPageSize(pFS); return LSM_MIN(((nKey * 4) / nPgsz), 3); } + +/* +** Release the reference to the current output page of merge-worker *pMW +** (reference pMW->pPage). Set the page number values in aSave[] as +** required (see comments above struct MergeWorker for details). +*/ +static int mergeWorkerPersistAndRelease(MergeWorker *pMW){ + int rc; + int i; + + assert( pMW->pPage || (pMW->aSave[0].bStore==0 && pMW->aSave[1].bStore==0) ); + + /* Persist the page */ + rc = lsmFsPagePersist(pMW->pPage); + + /* If required, save the page number. */ + for(i=0; i<2; i++){ + if( pMW->aSave[i].bStore ){ + pMW->aSave[i].iPgno = lsmFsPageNumber(pMW->pPage); + pMW->aSave[i].bStore = 0; + } + } + + /* Release the completed output page. */ + lsmFsPageRelease(pMW->pPage); + pMW->pPage = 0; + return rc; +} /* ** Advance to the next page of an output run being populated by merge-worker ** pMW. The footer of the new page is initialized to indicate that it contains ** zero records. The flags field is cleared. The page footer pointer field @@ -3193,34 +3324,33 @@ ** ** If successful, LSM_OK is returned. Otherwise, an error code. */ static int mergeWorkerNextPage( MergeWorker *pMW, /* Merge worker object to append page to */ - int iFPtr /* Pointer value for footer of new page */ + Pgno iFPtr /* Pointer value for footer of new page */ ){ int rc = LSM_OK; /* Return code */ Page *pNext = 0; /* New page appended to run */ lsm_db *pDb = pMW->pDb; /* Database handle */ Segment *pSeg; /* Run to append to */ pSeg = &pMW->pLevel->lhs; rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNext); - assert( rc!=LSM_OK || pSeg->iFirst>0 ); + assert( rc!=LSM_OK || pSeg->iFirst>0 || pMW->pDb->compress.xCompress ); if( rc==LSM_OK ){ u8 *aData; /* Data buffer belonging to page pNext */ int nData; /* Size of aData[] in bytes */ - lsmFsPageRelease(pMW->pPage); + rc = mergeWorkerPersistAndRelease(pMW); + pMW->pPage = pNext; pMW->pLevel->pMerge->iOutputOff = 0; - aData = fsPageData(pNext, &nData); lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0); lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0); - lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr); - + lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr); pMW->nWork++; } return rc; } @@ -3268,45 +3398,86 @@ } return rc; } + +/* +** The MergeWorker passed as the only argument is working to merge two or +** more existing segments together (not to flush an in-memory tree). It +** has not yet written the first key to the first page of the output. +*/ +static int mergeWorkerFirstPage(MergeWorker *pMW){ + int rc = LSM_OK; /* Return code */ + Page *pPg = 0; /* First page of run pSeg */ + int iFPtr = 0; /* Pointer value read from footer of pPg */ + MultiCursor *pCsr = pMW->pCsr; + + assert( pMW->pPage==0 ); + + if( pCsr->pBtCsr ){ + rc = LSM_OK; + iFPtr = pMW->pLevel->pNext->lhs.iFirst; + }else if( pCsr->nPtr>0 ){ + Segment *pSeg; + pSeg = pCsr->aPtr[pCsr->nPtr-1].pSeg; + rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg->iFirst, &pPg); + if( rc==LSM_OK ){ + u8 *aData; /* Buffer for page pPg */ + int nData; /* Size of aData[] in bytes */ + aData = fsPageData(pPg, &nData); + iFPtr = pageGetPtr(aData, nData); + lsmFsPageRelease(pPg); + } + } + + if( rc==LSM_OK ){ + rc = mergeWorkerNextPage(pMW, iFPtr); + if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr; + pMW->aSave[0].bStore = 1; + } + + return rc; +} static int mergeWorkerWrite( MergeWorker *pMW, /* Merge worker object to write into */ int eType, /* One of SORTED_SEPARATOR, WRITE or DELETE */ void *pKey, int nKey, /* Key value */ MultiCursor *pCsr, /* Read value (if any) from here */ - int iPtr, /* Absolute value of page pointer, or 0 */ - int *piPtrOut /* OUT: Pointer to write to separators */ + int iPtr /* Absolute value of page pointer, or 0 */ ){ int rc = LSM_OK; /* Return code */ Merge *pMerge; /* Persistent part of level merge state */ int nHdr; /* Space required for this record header */ Page *pPg; /* Page to write to */ u8 *aData; /* Data buffer for page pWriter->pPage */ int nData; /* Size of buffer aData[] in bytes */ int nRec; /* Number of records on page pPg */ int iFPtr; /* Value of pointer in footer of pPg */ - int iRPtr; /* Value of pointer written into record */ + int iRPtr = 0; /* Value of pointer written into record */ int iOff; /* Current write offset within page pPg */ Segment *pSeg; /* Segment being written */ int flags = 0; /* If != 0, flags value for page footer */ + int bFirst = 0; /* True for first key of output run */ void *pVal; int nVal; pMerge = pMW->pLevel->pMerge; pSeg = &pMW->pLevel->lhs; + if( pSeg->iFirst==0 && pMW->pPage==0 ){ + rc = mergeWorkerFirstPage(pMW); + bFirst = 1; + } pPg = pMW->pPage; - aData = fsPageData(pPg, &nData); - nRec = pageGetNRec(aData, nData); - iFPtr = pageGetPtr(aData, nData); - - /* Calculate the relative pointer value to write to this record */ - iRPtr = iPtr - iFPtr; - /* assert( iRPtr>=0 ); */ + if( pPg ){ + aData = fsPageData(pPg, &nData); + nRec = pageGetNRec(aData, nData); + iFPtr = pageGetPtr(aData, nData); + iRPtr = iPtr - iFPtr; + } /* Figure out how much space is required by the new record. The space ** required is divided into two sections: the header and the body. The ** header consists of the intial varint fields. The body are the blobs ** of data that correspond to the key and value data. The entire header @@ -3316,56 +3487,47 @@ ** The header space is: ** ** 1) record type - 1 byte. ** 2) Page-pointer-offset - 1 varint ** 3) Key size - 1 varint - ** 4) Value size - 1 varint (SORTED_WRITE only) + ** 4) Value size - 1 varint (only if LSM_INSERT flag is set) */ rc = lsmMCursorValue(pCsr, &pVal, &nVal); if( rc==LSM_OK ){ nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey); if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal); /* If the entire header will not fit on page pPg, or if page pPg is - ** marked read-only, advance to the next page of the output run. */ + ** marked read-only, advance to the next page of the output run. */ iOff = pMerge->iOutputOff; - if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){ + if( iOff<0 || pPg==0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){ iFPtr = *pCsr->pPrevMergePtr; iRPtr = iPtr - iFPtr; - iOff = 0; nRec = 0; rc = mergeWorkerNextPage(pMW, iFPtr); pPg = pMW->pPage; } } /* If this record header will be the first on the page, and the page is - ** not the very first in the entire run, special actions may need to be - ** taken: - ** - ** * If currently writing the main run, *piPtrOut should be set to - ** the current page number. The caller will add a key to the separators - ** array that points to the current page. - ** - ** * If currently writing the separators array, push a copy of the key - ** into the b-tree hierarchy. + ** not the very first in the entire run, add a copy of the key to the + ** b-tree hierarchy. */ - if( rc==LSM_OK && nRec==0 && pSeg->iFirst!=pSeg->iLast ){ + if( rc==LSM_OK && nRec==0 && bFirst==0 ){ assert( pMerge->nSkip>=0 ); if( pMerge->nSkip==0 ){ - Pgno iPg = lsmFsPageNumber(pPg); - rc = mergeWorkerPushHierarchy(pMW, 0, iPg, rtTopic(eType), pKey, nKey); - } - if( pMerge->nSkip ){ + rc = mergeWorkerPushHierarchy(pMW, rtTopic(eType), pKey, nKey); + assert( pMW->aSave[0].bStore==0 ); + pMW->aSave[0].bStore = 1; + pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey); + }else{ pMerge->nSkip--; flags = PGFTR_SKIP_THIS_FLAG; - }else{ - *piPtrOut = lsmFsPageNumber(pPg); - pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey); } + if( pMerge->nSkip ) flags |= PGFTR_SKIP_NEXT_FLAG; } /* Update the output segment */ if( rc==LSM_OK ){ @@ -3420,10 +3582,11 @@ */ static void mergeWorkerShutdown(MergeWorker *pMW, int *pRc){ int i; /* Iterator variable */ int rc = *pRc; MultiCursor *pCsr = pMW->pCsr; + Hierarchy *p = &pMW->hier; /* Unless the merge has finished, save the cursor position in the ** Merge.aInput[] array. See function mergeWorkerInit() for the ** code to restore a cursor position based on aInput[]. */ if( rc==LSM_OK && pCsr && lsmMCursorValid(pCsr) ){ @@ -3455,62 +3618,29 @@ if( iPtrnPtr ){ pMerge->splitkey = pMerge->aInput[iPtr]; }else{ btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey); } + + pMerge->iOutputOff = -1; } lsmMCursorClose(pCsr); - lsmFsPageRelease(pMW->pPage); - - for(i=0; i<2; i++){ - Hierarchy *p = &pMW->hier; - int iPg; - for(iPg=0; iPgnHier; iPg++){ - int rc2 = lsmFsPageRelease(p->apHier[iPg]); - if( rc==LSM_OK ) rc = rc2; - } - lsmFree(pMW->pDb->pEnv, p->apHier); - p->apHier = 0; - p->nHier = 0; - } - + + /* Persist and release the output page. */ + if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW); + if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW); + if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW); + if( rc==LSM_OK ) rc = mergeWorkerAddPadding(pMW); + + lsmFree(pMW->pDb->pEnv, pMW->aGobble); + pMW->aGobble = 0; pMW->pCsr = 0; pMW->pPage = 0; pMW->pPage = 0; -} - -static int mergeWorkerFirstPage(MergeWorker *pMW){ - int rc; /* Return code */ - Page *pPg = 0; /* First page of run pSeg */ - int iFPtr; /* Pointer value read from footer of pPg */ - MultiCursor *pCsr = pMW->pCsr; - - assert( pMW->pPage==0 ); - - if( pCsr->pBtCsr ){ - rc = LSM_OK; - iFPtr = pMW->pLevel->pNext->lhs.iFirst; - }else{ - Segment *pSeg; - pSeg = pMW->pCsr->aPtr[pMW->pCsr->nPtr-1].pSeg; - rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg->iFirst, &pPg); - if( rc==LSM_OK ){ - u8 *aData; /* Buffer for page pPg */ - int nData; /* Size of aData[] in bytes */ - aData = fsPageData(pPg, &nData); - iFPtr = pageGetPtr(aData, nData); - lsmFsPageRelease(pPg); - } - } - - if( rc==LSM_OK ){ - rc = mergeWorkerNextPage(pMW, iFPtr); - if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr; - } - - return rc; + + *pRc = rc; } /* ** The cursor passed as the first argument is being used as the input for ** a merge operation. When this function is called, *piFlags contains the @@ -3606,19 +3736,13 @@ /* If this is a separator key and we know that the output pointer has not ** changed, there is no point in writing an output record. Otherwise, ** proceed. */ if( rtIsSeparator(eType)==0 || iPtr!=0 ){ - int iSPtr = 0; /* Separators require a pointer here */ - - if( pMW->pPage==0 ){ - rc = mergeWorkerFirstPage(pMW); - } - /* Write the record into the main run. */ if( rc==LSM_OK ){ - rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pCsr, iPtr, &iSPtr); + rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pCsr, iPtr); } } } /* Advance the cursor to the next input record (assuming one exists). */ @@ -3629,25 +3753,27 @@ ** references currently held by the merge worker and inform the ** FileSystem object that no further pages will be appended to either ** the main or separators array. */ if( rc==LSM_OK && !lsmMCursorValid(pMW->pCsr) ){ + + mergeWorkerShutdown(pMW, &rc); if( pSeg->iFirst ){ rc = lsmFsSortedFinish(pDb->pFS, pSeg); } #ifdef LSM_DEBUG_EXPENSIVE if( rc==LSM_OK ){ +#if 0 rc = assertBtreeOk(pDb, pSeg); if( pMW->pCsr->pBtCsr ){ Segment *pNext = &pMW->pLevel->pNext->lhs; rc = assertPointersOk(pDb, pSeg, pNext, 0); } - } #endif - - mergeWorkerShutdown(pMW, &rc); + } +#endif } return rc; } static int mergeWorkerDone(MergeWorker *pMW){ @@ -3678,11 +3804,10 @@ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ Segment *pDel = 0; /* Delete separators from this segment */ - Pgno iLeftPtr = 0; int nWrite = 0; /* Number of database pages written */ assert( pnOvfl ); /* Allocate the new level structure to write to. */ @@ -3702,44 +3827,47 @@ multiCursorVisitFreelist(pCsr, pnOvfl); rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree); if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){ pDel = &pNext->lhs; rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr); - iLeftPtr = pNext->lhs.iFirst; + } + + if( pNext==0 ){ + multiCursorIgnoreDelete(pCsr); } } if( rc!=LSM_OK ){ lsmMCursorClose(pCsr); }else{ + Pgno iLeftPtr = 0; Merge merge; /* Merge object used to create new level */ MergeWorker mergeworker; /* MergeWorker object for the same purpose */ memset(&merge, 0, sizeof(Merge)); memset(&mergeworker, 0, sizeof(MergeWorker)); pNew->pMerge = &merge; + pNew->iAge = -1; mergeworker.pDb = pDb; mergeworker.pLevel = pNew; mergeworker.pCsr = pCsr; pCsr->pPrevMergePtr = &iLeftPtr; /* Mark the separators array for the new level as a "phantom". */ mergeworker.bFlush = 1; - /* Allocate the first page of the output segment. */ - rc = mergeWorkerNextPage(&mergeworker, iLeftPtr); - /* Do the work to create the new merged segment on disk */ if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr); while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){ rc = mergeWorkerStep(&mergeworker); } nWrite = mergeworker.nWork; mergeWorkerShutdown(&mergeworker, &rc); pNew->pMerge = 0; + pNew->iAge = 0; } /* Link the new level into the top of the tree. */ if( rc==LSM_OK ){ if( pDel ) pDel->iRoot = 0; @@ -3746,17 +3874,18 @@ }else{ lsmDbSnapshotSetLevel(pDb->pWorker, pNext); sortedFreeLevel(pDb->pEnv, pNew); } - if( rc==LSM_OK ){ - sortedInvokeWorkHook(pDb); - } - #if 0 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel"); #endif + + if( rc==LSM_OK ){ + assertBtreeOk(pDb, &pNew->lhs); + sortedInvokeWorkHook(pDb); + } if( pnWrite ) *pnWrite = nWrite; pDb->pWorker->nWrite += nWrite; return rc; } @@ -3840,40 +3969,10 @@ *ppNew = pNew; return rc; } -static int mergeWorkerLoadOutputPage(MergeWorker *pMW){ - int rc = LSM_OK; /* Return code */ - Segment *pSeg; /* Run to load page from */ - Level *pLevel; - - pLevel = pMW->pLevel; - pSeg = &pLevel->lhs; - if( pSeg->iLast ){ - Page *pPg; - rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg->iLast, &pPg); - - while( rc==LSM_OK ){ - Page *pNext; - u8 *aData; - int nData; - aData = fsPageData(pPg, &nData); - if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ) break; - rc = lsmFsDbPageNext(pSeg, pPg, -1, &pNext); - lsmFsPageRelease(pPg); - pPg = pNext; - } - - if( rc==LSM_OK ){ - pMW->pPage = pPg; - if( pLevel->pMerge->iOutputOff>=0 ) rc = lsmFsPageWrite(pPg); - } - } - return rc; -} - static int mergeWorkerInit( lsm_db *pDb, /* Db connection to do merge work */ Level *pLevel, /* Level to work on merging */ MergeWorker *pMW /* Object to initialize */ ){ @@ -3914,17 +4013,20 @@ } assert( rc!=LSM_OK || pMerge->nInput==(pCsr->nPtr+(pCsr->pBtCsr!=0)) ); pMW->pCsr = pCsr; - /* Load the current output page into memory. */ - if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW); + /* Load the b-tree hierarchy into memory. */ + if( rc==LSM_OK ) rc = mergeWorkerLoadHierarchy(pMW); + if( rc==LSM_OK && pMW->hier.nHier==0 ){ + pMW->aSave[0].iPgno = pLevel->lhs.iFirst; + } /* Position the cursor. */ if( rc==LSM_OK ){ pCsr->pPrevMergePtr = &pMerge->iCurrentPtr; - if( pMW->pPage==0 ){ + if( pLevel->lhs.iFirst==0 ){ /* The output array is still empty. So position the cursor at the very ** start of the input. */ rc = multiCursorEnd(pCsr, 0); }else{ /* The output array is non-empty. Position the cursor based on the @@ -3957,10 +4059,11 @@ } return rc; } +/* TODO: Re-enable this!!! */ static int sortedBtreeGobble( lsm_db *pDb, MultiCursor *pCsr, int iGobble ){ @@ -3976,11 +4079,12 @@ if( rc==LSM_OK ){ rc = seekInBtree(pCsr, pSeg, p->pData, p->nData, aPg, 0); } for(nPg=0; aPg[nPg]; nPg++); -#if 1 + +#if 0 lsmFsGobble(pDb, pSeg, aPg, nPg); #endif lsmFree(pDb->pEnv, aPg); } @@ -4177,10 +4281,11 @@ if( rc==LSM_OK ) sortedInvokeWorkHook(pDb); #if 0 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work"); #endif + assertBtreeOk(pDb, &pLevel->lhs); assertRunInOrder(pDb, &pLevel->lhs); /* If bFlush is true and the database is no longer considered "full", ** break out of the loop even if nRemaining is still greater than ** zero. The caller has an in-memory tree to flush to disk. */ @@ -4198,32 +4303,36 @@ } /* ** The database connection passed as the first argument must be a worker ** connection. This function checks if there exists an "old" in-memory tree -** ready to be flushed to disk. If so, *pbOut is set to true before -** returning. Otherwise false. +** ready to be flushed to disk. If so, true is returned. Otherwise false. ** -** Normally, LSM_OK is returned. Or, if an error occurs, an LSM error code. +** If an error occurs, *pRc is set to an LSM error code before returning. +** It is assumed that *pRc is set to LSM_OK when this function is called. */ -static int sortedTreeHasOld(lsm_db *pDb, int *pbOut){ +static int sortedTreeHasOld(lsm_db *pDb, int *pRc){ int rc = LSM_OK; + int bRet = 0; assert( pDb->pWorker ); - if( pDb->nTransOpen==0 ){ - rc = lsmTreeLoadHeader(pDb, 0); - } - - if( rc==LSM_OK - && pDb->treehdr.iOldShmid - && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff - ){ - *pbOut = 1; - }else{ - *pbOut = 0; - } - return rc; + if( *pRc==LSM_OK ){ + if( pDb->nTransOpen==0 ){ + rc = lsmTreeLoadHeader(pDb, 0); + } + if( rc==LSM_OK + && pDb->treehdr.iOldShmid + && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff + ){ + bRet = 1; + }else{ + bRet = 0; + } + *pRc = rc; + } + assert( *pRc==LSM_OK || bRet==0 ); + return bRet; } static int doLsmSingleWork( lsm_db *pDb, int bShutdown, @@ -4242,10 +4351,11 @@ /* Open the worker 'transaction'. It will be closed before this function ** returns. */ assert( pDb->pWorker==0 ); rc = lsmBeginWork(pDb); + assert( rc!=8 ); if( rc!=LSM_OK ) return rc; /* If this connection is doing auto-checkpoints, set nMax (and nRem) so ** that this call stops writing when the auto-checkpoint is due. */ if( bShutdown==0 && pDb->nAutockpt ){ @@ -4263,35 +4373,29 @@ bCkpt = 1; nRem = LSM_MAX(nMax, 0); } } - /* If the FLUSH flag is set, there exists in-memory ready to be flushed - ** to disk and there are lsm_db.nMerge or fewer age=0 levels, flush the - ** data to disk now. */ - if( (flags & LSM_WORK_FLUSH) ){ - int bOld; - rc = sortedTreeHasOld(pDb, &bOld); - if( bOld ){ - if( sortedDbIsFull(pDb) ){ - int nPg = 0; - rc = sortedWork(pDb, nRem, 0, 1, &nPg); - nRem -= nPg; - assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) ); - bToplevel = 1; - } - - if( rc==LSM_OK && nRem>0 ){ - int nPg = 0; - rc = sortedNewToplevel(pDb, TREE_OLD, &nOvfl, &nPg); - nRem -= nPg; - if( rc==LSM_OK && pDb->nTransOpen>0 ){ - lsmTreeDiscardOld(pDb); - } - bFlush = 1; - bToplevel = 0; - } + /* If there exists in-memory data ready to be flushed to disk, attempt + ** to flush it now. */ + if( sortedTreeHasOld(pDb, &rc) ){ + if( sortedDbIsFull(pDb) ){ + int nPg = 0; + rc = sortedWork(pDb, nRem, 0, 1, &nPg); + nRem -= nPg; + assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) ); + bToplevel = 1; + } + if( rc==LSM_OK && nRem>0 ){ + int nPg = 0; + rc = sortedNewToplevel(pDb, TREE_OLD, &nOvfl, &nPg); + nRem -= nPg; + if( rc==LSM_OK && pDb->nTransOpen>0 ){ + lsmTreeDiscardOld(pDb); + } + bFlush = 1; + bToplevel = 0; } } /* If nPage is still greater than zero, do some merging. */ if( rc==LSM_OK && nRem>0 && bShutdown==0 ){ @@ -4314,11 +4418,10 @@ rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, 0); } } if( rc==LSM_OK && (nRem!=nMax) ){ - rc = lsmSortedFlushDb(pDb); lsmFinishWork(pDb, bFlush, nOvfl, &rc); }else{ int rcdummy = LSM_BUSY; assert( rc!=LSM_OK || bFlush==0 ); lsmFinishWork(pDb, 0, 0, &rcdummy); @@ -4459,11 +4562,11 @@ */ static char *segToString(lsm_env *pEnv, Segment *pSeg, int nMin){ int nSize = pSeg->nSize; Pgno iRoot = pSeg->iRoot; Pgno iFirst = pSeg->iFirst; - Pgno iLast = pSeg->iLast; + Pgno iLast = pSeg->iLastPg; char *z; char *z1; char *z2; int nPad; @@ -4542,11 +4645,11 @@ assert( (flags & SEGMENT_BTREE_FLAG) || eType!=0 ); aCell += lsmVarintGet32(aCell, &iPgPtr); if( eType==0 ){ Pgno iRef; /* Page number of referenced page */ - aCell += lsmVarintGet32(aCell, &iRef); + aCell += lsmVarintGet64(aCell, &iRef); lsmFsDbPageGet(pDb->pFS, iRef, &pRef); aKey = pageGetKey(pRef, 0, &iTopic, &nKey, &blob); }else{ aCell += lsmVarintGet32(aCell, &nKey); if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal); @@ -4577,10 +4680,11 @@ sortedBlobFree(&blob); } static void infoCellDump( lsm_db *pDb, + int bIndirect, /* True to follow indirect refs */ Page *pPg, int iCell, int *peType, int *piPgPtr, u8 **paKey, int *pnKey, @@ -4602,16 +4706,21 @@ aCell += lsmVarintGet32(aCell, &iPgPtr); if( eType==0 ){ int dummy; Pgno iRef; /* Page number of referenced page */ - aCell += lsmVarintGet32(aCell, &iRef); - lsmFsDbPageGet(pDb->pFS, iRef, &pRef); - pageGetKeyCopy(pDb->pEnv, pRef, 0, &dummy, pBlob); - aKey = (u8 *)pBlob->pData; - nKey = pBlob->nData; - lsmFsPageRelease(pRef); + aCell += lsmVarintGet64(aCell, &iRef); + if( bIndirect ){ + lsmFsDbPageGet(pDb->pFS, iRef, &pRef); + pageGetKeyCopy(pDb->pEnv, pRef, 0, &dummy, pBlob); + aKey = (u8 *)pBlob->pData; + nKey = pBlob->nData; + lsmFsPageRelease(pRef); + }else{ + aKey = (u8 *)""; + nKey = 11; + } }else{ aCell += lsmVarintGet32(aCell, &nKey); if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal); sortedReadData(pPg, (aCell-aData), nKey+nVal, (void **)&aKey, pBlob); aVal = &aKey[nKey]; @@ -4635,13 +4744,14 @@ } } return LSM_OK; } -#define INFO_PAGE_DUMP_DATA 0x01 -#define INFO_PAGE_DUMP_VALUES 0x02 -#define INFO_PAGE_DUMP_HEX 0x04 +#define INFO_PAGE_DUMP_DATA 0x01 +#define INFO_PAGE_DUMP_VALUES 0x02 +#define INFO_PAGE_DUMP_HEX 0x04 +#define INFO_PAGE_DUMP_INDIRECT 0x08 static int infoPageDump( lsm_db *pDb, /* Database handle */ Pgno iPg, /* Page number of page to dump */ int flags, @@ -4653,10 +4763,11 @@ const int perLine = 16; /* Bytes per line in the raw hex dump */ int bValues = (flags & INFO_PAGE_DUMP_VALUES); int bHex = (flags & INFO_PAGE_DUMP_HEX); int bData = (flags & INFO_PAGE_DUMP_DATA); + int bIndirect = (flags & INFO_PAGE_DUMP_INDIRECT); *pzOut = 0; if( iPg==0 ) return LSM_ERROR; rc = lsmFsDbPageGet(pDb->pFS, iPg, &pPg); @@ -4682,11 +4793,11 @@ lsmStringAppendf(&str, "flags: %04x\n", flags); lsmStringAppendf(&str, "\n"); for(iCell=0; iCellnKeyWidth ) nKeyWidth = nKey; } if( bHex ) nKeyWidth = nKeyWidth * 2; for(iCell=0; iCellpNext; sortedFreeLevel(pEnv, p); } } -int lsmSortedFlushDb(lsm_db *pDb){ - int rc = LSM_OK; - Level *p; - - assert( pDb->pWorker ); - for(p=lsmDbSnapshotLevel(pDb->pWorker); p && rc==LSM_OK; p=p->pNext){ - Merge *pMerge = p->pMerge; - if( pMerge ){ - pMerge->iOutputOff = -1; - pMerge->bHierReadonly = 1; - } - } - - return LSM_OK; -} - void lsmSortedSaveTreeCursors(lsm_db *pDb){ MultiCursor *pCsr; for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){ lsmTreeCursorSave(pCsr->apTreeCsr[0]); lsmTreeCursorSave(pCsr->apTreeCsr[1]); Index: src/lsm_tree.c ================================================================== --- src/lsm_tree.c +++ src/lsm_tree.c @@ -2419,10 +2419,11 @@ assert( ((prev&LSM_START_DELETE)==0)==((pKey->flags&LSM_END_DELETE)==0) ); prev = pKey->flags; } tblobFree(csr.pDb, &csr.blob); + tblobFree(csr.pDb, &blob); return 1; } static int treeCountEntries(lsm_db *db){ Index: test/csr1.test ================================================================== --- test/csr1.test +++ test/csr1.test @@ -58,12 +58,12 @@ BEGIN; INSERT INTO t1 VALUES(10, 100); } do_test 2.2 { sqlite4 db2 ./test.db - list [catch { sqlite4_lsm_work db2 main -flush 0 } msg] $msg -} {1 SQLITE4_BUSY} + list [catch { db2 eval { BEGIN ; INSERT INTO t1 VALUES(1, 2) } } msg] $msg +} {1 {database is locked}} do_execsql_test 2.3 { COMMIT } do_test 2.4 { sqlite4_lsm_work db2 main -flush 0 } {0} db2 close Index: test/log1.test ================================================================== --- test/log1.test +++ test/log1.test @@ -132,11 +132,11 @@ db2 close reset_db do_execsql_test 3.5 { CREATE TABLE t1(a, b) } do_test 3.6 { - sqlite4_lsm_work db main -flush -checkpoint 0 + sqlite4_lsm_checkpoint db main for {set i 0} {$i < 203} {incr i} { execsql { INSERT INTO t1 VALUES(randstr(100,100), randstr(100,100)) } } execsql { SELECT count(*) FROM t1 } } {203} @@ -274,14 +274,15 @@ INSERT INTO x VALUES(randstr(10,10), randstr(100,100)); INSERT INTO x VALUES(randstr(10,10), randstr(100,100)); INSERT INTO x VALUES(randstr(10,10), randstr(100,100)); } do_filesize_test 8.2 0 776 -do_test 8.3 { sqlite4_lsm_work db main -flush } 0 +do_test 8.3.1 { sqlite4_lsm_flush db main } {} +do_test 8.3.2 { sqlite4_lsm_work db main } 0 do_execsql_test 8.4 { INSERT INTO x VALUES(randstr(10,10), randstr(100,100)) } do_filesize_test 8.5 12288 915 -do_test 8.6 { sqlite4_lsm_work db main -checkpoint } 0 +do_test 8.6 { sqlite4_lsm_checkpoint db main } {} do_test 8.7 { copy_db_files test.db test.db2 sqlite4 db2 test.db2 execsql { SELECT count(*) FROM x ; PRAGMA integrity_check } db2 @@ -356,11 +357,11 @@ do_test 10.8 { sqlite4_lsm_work db main -flush } 0 do_execsql_test 10.9 { INSERT INTO t1 VALUES(randstr(10,10), randstr(100,100)); } do_test 10.9 { sqlite4_lsm_info db main log-structure } {0 0 0 0 0 695} -do_test 10.10 { sqlite4_lsm_work db main -checkpoint } 0 +do_test 10.10 { sqlite4_lsm_checkpoint db main } {} do_test 10.11 { sqlite4_lsm_info db main log-structure } {0 0 0 0 556 695} #------------------------------------------------------------------------- # reset_db @@ -383,11 +384,11 @@ do_test 11.5 { sqlite4_lsm_work db main -flush } 0 do_execsql_test 11.6 { INSERT INTO t1 VALUES(randstr(10,10), randstr(100,100)); } do_test 11.7 { sqlite4_lsm_info db main log-structure } {0 0 0 0 0 1474} -do_test 11.8 { sqlite4_lsm_work db main -checkpoint } 0 +do_test 11.8 { sqlite4_lsm_checkpoint db main } {} do_test 11.9 { sqlite4_lsm_info db main log-structure } {0 0 0 0 1335 1474} do_execsql_test 11.10 { INSERT INTO t1 VALUES(randstr(10,10), randstr(100,100)); } do_test 11.11 { sqlite4_lsm_info db main log-structure } {1335 1482 0 0 0 139} @@ -443,11 +444,11 @@ INSERT INTO t1 VALUES(randstr(10,10), randstr(100,100)); } do_test 11.23 { sqlite4_lsm_info db main log-structure } {1335 1482 0 1259 1483 1908} -do_test 11.24 { sqlite4_lsm_work db main -checkpoint } {0} +do_test 11.24 { sqlite4_lsm_checkpoint db main } {} do_test 11.25 { sqlite4_lsm_info db main log-structure } {0 0 0 0 1769 1908} #------------------------------------------------------------------------- @@ -462,11 +463,11 @@ expr srand(0) do_test 12.3.$iTest { for {set i 0} {$i < 10} {incr i} { execsql { INSERT INTO t1 VALUES(randstr(20,20), randstr(100,100)) } if { int(rand()*10.0)==0 } { sqlite4_lsm_work db main -flush } - if { int(rand()*10.0)==0 } { sqlite4_lsm_work db main -checkpoint } + if { int(rand()*10.0)==0 } { sqlite4_lsm_checkpoint db main } } copy_db_files test.db test.db2 sqlite4 db2 test.db2 set sql "SELECT count(*) FROM t1 ; " if {0==($iTest % 25)} { Index: test/log3.test ================================================================== --- test/log3.test +++ test/log3.test @@ -54,14 +54,14 @@ INSERT INTO t1 VALUES(randstr(50,50), randstr(50,50)); COMMIT; } {} do_filesize_test 2.5 0 2048 -do_test 2.6 { sqlite4_lsm_work db main -flush 0 } {0} +do_test 2.6 { sqlite4_lsm_flush db main } {} do_execsql_test 2.7 { INSERT INTO t1 VALUES(randstr(50,50), randstr(50,50)) } -do_test 2.8 { sqlite4_lsm_work db main -check 0 } {0} -do_test 2.9 { sqlite4_lsm_info db main log-structure } {0 0 0 0 2048 2560} +do_test 2.8 { sqlite4_lsm_checkpoint db main } {} +do_test 2.9 { sqlite4_lsm_info db main log-structure } {0 0 0 0 2560 3072} for {set i 1} {$i <= 6} {incr i} { do_execsql_test 2.10.$i.1 { INSERT INTO t1 VALUES(randstr(50,50), randstr(50,50)); } @@ -69,9 +69,9 @@ do_recover_test 2.10.$i.3 { SELECT count(*) FROM t1 } [expr 8 + $i] } do_test 2.11 { sqlite4_lsm_info db main log-structure -} {2048 2568 0 1704 3072 4608} +} {2560 3080 0 2216 3584 4608} finish_test Index: test/permutations.test ================================================================== --- test/permutations.test +++ test/permutations.test @@ -132,11 +132,11 @@ lappend ::testsuitelist xxx test_suite "src4" -prefix "" -description { } -files { simple.test simple2.test - log1.test log2.test log3.test + log3.test csr1.test ckpt1.test mc1.test aggerror.test Index: test/simple.test ================================================================== --- test/simple.test +++ test/simple.test @@ -1367,10 +1367,13 @@ select * from maintable, joinme INDEXED by joinme_id_text_idx } {1 {cannot use index: joinme_id_text_idx}} #------------------------------------------------------------------------- # This is testing that the "phantom" runs feature works. +# +# UPDATE: Said feature was dropped early in development. But the test +# remains valid. reset_db do_execsql_test 71.1 { CREATE TABLE t1(x); INSERT INTO t1 VALUES(randomblob(1024)); -- 1 INSERT INTO t1 SELECT randomblob(1024) FROM t1; -- 2 @@ -1388,15 +1391,18 @@ expr {[file size test.db] < 256*1024} } {1} #------------------------------------------------------------------------- # This is testing that the "phantom" runs feature works with mmap. +# +# UPDATE: Said feature was dropped early in development. But the test +# remains valid. reset_db -do_test 72.0.1 { sqlite4_lsm_config db main mmap } 0 -do_test 72.0.2 { sqlite4_lsm_config db main mmap 1 } 1 -do_test 72.0.3 { sqlite4_lsm_config db main mmap } 1 +#do_test 72.0.1 { sqlite4_lsm_config db main mmap } 0 +#do_test 72.0.2 { sqlite4_lsm_config db main mmap 1 } 1 +#do_test 72.0.3 { sqlite4_lsm_config db main mmap } 1 do_execsql_test 72.1 { CREATE TABLE t1(x); INSERT INTO t1 VALUES(randomblob(1024)); -- 1 INSERT INTO t1 SELECT randomblob(1024) FROM t1; -- 2 Index: test/test_lsm.c ================================================================== --- test/test_lsm.c +++ test/test_lsm.c @@ -152,11 +152,10 @@ struct Switch { const char *zSwitch; int flags; } aSwitch[] = { { "-flush", LSM_WORK_FLUSH }, - { "-checkpoint", LSM_WORK_CHECKPOINT }, { "-optimize", LSM_WORK_OPTIMIZE }, { 0, 0 } }; int flags = 0; @@ -205,22 +204,106 @@ } Tcl_SetObjResult(interp, Tcl_NewIntObj(nWork)); return TCL_OK; } + +/* +** TCLCMD: sqlite4_lsm_checkpoint DB DBNAME +*/ +static int test_lsm_checkpoint( + void * clientData, + Tcl_Interp *interp, + int objc, + Tcl_Obj *CONST objv[] +){ + const char *zDb; + const char *zName; + int rc; + sqlite4 *db; + lsm_db *pLsm; + + if( objc!=3 ){ + Tcl_WrongNumArgs(interp, 1, objv, "DB DBNAME"); + return TCL_ERROR; + } + zDb = Tcl_GetString(objv[1]); + zName = Tcl_GetString(objv[2]); + + rc = getDbPointer(interp, zDb, &db); + if( rc!=TCL_OK ) return rc; + + rc = sqlite4_kvstore_control(db, zName, SQLITE4_KVCTRL_LSM_HANDLE, &pLsm); + if( rc==SQLITE4_OK ){ + rc = lsm_checkpoint(pLsm, 0); + } + if( rc!=SQLITE4_OK ){ + Tcl_SetResult(interp, (char *)sqlite4TestErrorName(rc), TCL_STATIC); + return TCL_ERROR; + } + + Tcl_ResetResult(interp); + return TCL_OK; +} + +/* +** TCLCMD: sqlite4_lsm_flush DB DBNAME +*/ +static int test_lsm_flush( + void * clientData, + Tcl_Interp *interp, + int objc, + Tcl_Obj *CONST objv[] +){ + const char *zDb; + const char *zName; + int rc; + sqlite4 *db; + lsm_db *pLsm; + + if( objc!=3 ){ + Tcl_WrongNumArgs(interp, 1, objv, "DB DBNAME"); + return TCL_ERROR; + } + zDb = Tcl_GetString(objv[1]); + zName = Tcl_GetString(objv[2]); + + rc = getDbPointer(interp, zDb, &db); + if( rc!=TCL_OK ) return rc; + + rc = sqlite4_kvstore_control(db, zName, SQLITE4_KVCTRL_LSM_HANDLE, &pLsm); + if( rc==SQLITE4_OK ){ + int nZero = 0; + int nOrig = -1; + lsm_config(pLsm, LSM_CONFIG_WRITE_BUFFER, &nOrig); + lsm_config(pLsm, LSM_CONFIG_WRITE_BUFFER, &nZero); + rc = lsm_begin(pLsm, 1); + if( rc==LSM_OK ) rc = lsm_commit(pLsm, 0); + lsm_config(pLsm, LSM_CONFIG_WRITE_BUFFER, &nOrig); + } + if( rc!=SQLITE4_OK ){ + Tcl_SetResult(interp, (char *)sqlite4TestErrorName(rc), TCL_STATIC); + return TCL_ERROR; + } + + Tcl_ResetResult(interp); + return TCL_OK; +} int SqlitetestLsm_Init(Tcl_Interp *interp){ struct SyscallCmd { const char *zName; Tcl_ObjCmdProc *xCmd; } aCmd[] = { - { "sqlite4_lsm_work", test_lsm_work }, - { "sqlite4_lsm_info", test_lsm_info }, - { "sqlite4_lsm_config", test_lsm_config }, + { "sqlite4_lsm_work", test_lsm_work }, + { "sqlite4_lsm_checkpoint", test_lsm_checkpoint }, + { "sqlite4_lsm_flush", test_lsm_flush }, + { "sqlite4_lsm_info", test_lsm_info }, + { "sqlite4_lsm_config", test_lsm_config }, }; int i; for(i=0; i