Index: lsm-test/lsmtest_main.c ================================================================== --- lsm-test/lsmtest_main.c +++ lsm-test/lsmtest_main.c @@ -565,11 +565,11 @@ " -write $write (default value 10000)\n" " -pause $pause (default value 0)\n" " -fetch $fetch (default value 0)\n" " -keysize $keysize (default value 12)\n" " -valsize $valsize (default value 100)\n" -" -system $system (default value \"lsm\"\n" +" -system $system (default value \"lsm\")\n" "\n" ); } int do_speed_test2(int nArg, char **azArg){ @@ -644,11 +644,11 @@ printf("#"); for(i=0; i=0 ){ printf(" %s=%d", &aOpt[i].zOpt[1], aParam[aOpt[i].eVal]); - }else{ + }else if( aOpt[i].eVal==-1 ){ printf(" %s=\"%s\"", &aOpt[i].zOpt[1], zSystem); } } } printf("\n"); @@ -1428,11 +1428,11 @@ #include static void lsmtest_rusage_report(void){ int res; struct rusage r; - memset(&r, sizeof(r), 0); + memset(&r, 0, sizeof(r)); res = getrusage(RUSAGE_SELF, &r); assert( res==0 ); printf("# getrusage: { ru_maxrss %d ru_oublock %d ru_inblock %d }\n", @@ -1456,11 +1456,11 @@ {"insert", do_insert}, {"replay", do_replay}, {"speed", do_speed_tests}, - {"speed2", do_speed_test2}, + {"speed2", do_speed_test2}, {"show", st_do_show}, {"work", st_do_work}, {"test", do_test}, {0, 0} }; Index: lsm-test/lsmtest_tdb3.c ================================================================== --- lsm-test/lsmtest_tdb3.c +++ lsm-test/lsmtest_tdb3.c @@ -814,14 +814,14 @@ z++; zStart = z; while( *z>='0' && *z<='9' ) z++; if( *z=='k' || *z=='K' ){ - iMul = 1024; + iMul = 1; z++; }else if( *z=='M' || *z=='M' ){ - iMul = 1024 * 1024; + iMul = 1024; z++; } nParam = z-zStart; if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error; memcpy(zParam, zStart, nParam); @@ -839,14 +839,14 @@ break; case TEST_MT_MODE: if( pLsm ) nThread = iVal; break; case TEST_MT_MIN_CKPT: - if( pLsm && iVal>0 ) pLsm->nMtMinCkpt = iVal; + if( pLsm && iVal>0 ) pLsm->nMtMinCkpt = iVal*1024; break; case TEST_MT_MAX_CKPT: - if( pLsm && iVal>0 ) pLsm->nMtMaxCkpt = iVal; + if( pLsm && iVal>0 ) pLsm->nMtMaxCkpt = iVal*1024; break; #ifdef HAVE_ZLIB case TEST_COMPRESSION: testConfigureCompression(db); break; Index: main.mk ================================================================== --- main.mk +++ main.mk @@ -42,16 +42,16 @@ ################################################################################ # FIXME: Required options for now. # OPTS += -DLSM_MUTEX_NONE -OPTS += -DSQLITE4_DEBUG=1 -DLSM_DEBUG=1 +#OPTS += -DSQLITE4_DEBUG=1 -DLSM_DEBUG=1 OPTS += -DHAVE_GMTIME_R OPTS += -DHAVE_LOCALTIME_R OPTS += -DHAVE_MALLOC_USABLE_SIZE OPTS += -DHAVE_USLEEP -OPTS += -DSQLITE4_MEMDEBUG=1 +#OPTS += -DSQLITE4_MEMDEBUG=1 #OPTS += -DSQLITE4_NO_SYNC=1 -DLSM_NO_SYNC=1 OPTS += -DSQLITE4_OMIT_ANALYZE OPTS += -DSQLITE4_OMIT_AUTOMATIC_INDEX OPTS += -DSQLITE4_OMIT_BTREECOUNT OPTS += -DSQLITE4_OMIT_VIRTUALTABLE=1 Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -168,10 +168,12 @@ #define LSM_END_DELETE 0x02 /* End of open-ended delete range */ #define LSM_POINT_DELETE 0x04 /* Delete this key */ #define LSM_INSERT 0x08 /* Insert this key and value */ #define LSM_SEPARATOR 0x10 /* True if entry is separator key only */ #define LSM_SYSTEMKEY 0x20 /* True if entry is a system key (FREELIST) */ + +#define LSM_CONTIGUOUS 0x40 /* Used in lsm_tree.c */ /* ** A string that can grow by appending. */ struct LsmString { Index: src/lsm_file.c ================================================================== --- src/lsm_file.c +++ src/lsm_file.c @@ -1330,10 +1330,11 @@ fsPageAddToLru(pFS, p); p->pFS = pFS; } p->aData = &((u8 *)pFS->pMap)[pFS->nPagesize * (iReal-1)]; p->iPg = iReal; + assert( (p->flags & PAGE_FREE)==0 ); }else{ /* Search the hash-table for the page */ iHash = fsHashKey(pFS->nHash, iReal); for(p=pFS->apHash[iHash]; p; p=p->pHashNext){ Index: src/lsm_main.c ================================================================== --- src/lsm_main.c +++ src/lsm_main.c @@ -753,20 +753,20 @@ ** ** If there are currently no other open cursor handles, and no open write ** transaction, open a read transaction here. */ int lsm_csr_open(lsm_db *pDb, lsm_cursor **ppCsr){ - int rc; /* Return code */ + int rc = LSM_OK; /* Return code */ MultiCursor *pCsr = 0; /* New cursor object */ /* Open a read transaction if one is not already open. */ assert_db_state(pDb); if( pDb->pShmhdr==0 ){ assert( pDb->bReadonly ); rc = lsmBeginRoTrans(pDb); - }else{ + }else if( pDb->iReader<0 ){ rc = lsmBeginReadTrans(pDb); } /* Allocate the multi-cursor. */ if( rc==LSM_OK ) rc = lsmMCursorNew(pDb, &pCsr); Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -1335,19 +1335,21 @@ /* ** Open a write transaction. */ int lsmBeginWriteTrans(lsm_db *pDb){ - int rc; /* Return code */ + int rc = LSM_OK; /* Return code */ ShmHeader *pShm = pDb->pShmhdr; /* Shared memory header */ assert( pDb->nTransOpen==0 ); assert( pDb->bDiscardOld==0 ); assert( pDb->bReadonly==0 ); /* If there is no read-transaction open, open one now. */ - rc = lsmBeginReadTrans(pDb); + if( pDb->iReader<0 ){ + rc = lsmBeginReadTrans(pDb); + } /* Attempt to take the WRITER lock */ if( rc==LSM_OK ){ rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0); } Index: src/lsm_tree.c ================================================================== --- src/lsm_tree.c +++ src/lsm_tree.c @@ -265,10 +265,17 @@ static void intArrayTruncate(IntArray *p, int nVal){ p->nArray = nVal; } /* End of IntArray methods. ***********************************************************************/ + +static int treeKeycmp(void *p1, int n1, void *p2, int n2){ + int res; + res = memcmp(p1, p2, LSM_MIN(n1, n2)); + if( res==0 ) res = (n1-n2); + return res; +} /* ** The pointer passed as the first argument points to an interior node, ** not a leaf. This function returns the offset of the iCell'th child ** sub-tree of the node. @@ -285,20 +292,23 @@ static int treeOffsetToChunk(u32 iOff){ assert( LSM_SHM_CHUNK_SIZE==(1<<15) ); return (int)(iOff>>15); } +#define treeShmptrUnsafe(pDb, iPtr) \ +(&((u8*)((pDb)->apShm[(iPtr)>>15]))[(iPtr) & (LSM_SHM_CHUNK_SIZE-1)]) + /* ** Return a pointer to the mapped memory location associated with *-shm ** file offset iPtr. */ static void *treeShmptr(lsm_db *pDb, u32 iPtr){ assert( (iPtr>>15)nShm ); assert( pDb->apShm[iPtr>>15] ); - return iPtr?(&((u8*)(pDb->apShm[iPtr>>15]))[iPtr & (LSM_SHM_CHUNK_SIZE-1)]):0; + return iPtr ? treeShmptrUnsafe(pDb, iPtr) : 0; } static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){ return (ShmChunk *)(pDb->apShm[iChunk]); } @@ -563,15 +573,20 @@ /* ** Return a pointer to the mapping of the TreeKey object that the cursor ** is pointing to. */ static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){ - TreeKey *pRet = (TreeKey *)treeShmkey(pCsr->pDb, - pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]], - TKV_LOADVAL, pBlob, pRc - ); - assert( pRet==0 || assertFlagsOk(pRet->flags) ); + TreeKey *pRet; + lsm_db *pDb = pCsr->pDb; + u32 iPtr = pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]]; + + assert( iPtr ); + pRet = treeShmptrUnsafe(pDb, iPtr); + if( !(pRet->flags & LSM_CONTIGUOUS) ){ + pRet = treeShmkey(pDb, iPtr, TKV_LOADVAL, pBlob, pRc); + } + return pRet; } /* ** Save the current position of tree cursor pCsr. @@ -719,10 +734,11 @@ void *pVal, int nVal, /* Value data (or nVal<0 for delete) */ int *pRc ){ TreeKey *p; u32 iPtr; + u32 iEnd; int nRem; u8 *a; int n; /* Allocate space for the TreeKey structure itself */ @@ -752,10 +768,17 @@ } a = pVal; n = nRem = nVal; pVal = 0; } + + iEnd = iPtr + sizeof(TreeKey) + nKey + LSM_MAX(0, nVal); + if( (iPtr & ~(LSM_SHM_CHUNK_SIZE-1))!=(iEnd & ~(LSM_SHM_CHUNK_SIZE-1)) ){ + p->flags = 0; + }else{ + p->flags = LSM_CONTIGUOUS; + } if( *pRc ) return 0; #if 0 printf("store: %d %s\n", (int)iPtr, (char *)pKey); #endif @@ -1438,10 +1461,11 @@ assert( nVal>=0 || pVal==0 ); assert_tree_looks_ok(LSM_OK, pTree); assert( flags==LSM_INSERT || flags==LSM_POINT_DELETE || flags==LSM_START_DELETE || flags==LSM_END_DELETE ); + assert( (flags & LSM_CONTIGUOUS)==0 ); #if 0 dump_tree_contents(pDb, "before"); #endif if( p->iRoot ){ @@ -1495,11 +1519,12 @@ } /* Allocate and populate a new key-value pair structure */ pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc); if( rc!=LSM_OK ) return rc; - pTreeKey->flags = flags; + assert( pTreeKey->flags==0 || pTreeKey->flags==LSM_CONTIGUOUS ); + pTreeKey->flags |= flags; if( p->iRoot==0 ){ /* The tree is completely empty. Add a new root node and install ** (pKey/nKey) as the middle entry. Even though it is a leaf at the ** moment, use newTreeNode() to allocate the node (i.e. allocate enough @@ -1781,11 +1806,11 @@ int bDone = 0; TreeRoot *p = &db->treehdr.root; TreeBlob blob = {0, 0}; /* The range must be sensible - that (key1 < key2). */ - assert( db->xCmp(pKey1, nKey1, pKey2, nKey2)<0 ); + assert( treeKeycmp(pKey1, nKey1, pKey2, nKey2)<0 ); assert( assert_delete_ranges_match(db) ); #if 0 static int nCall = 0; printf("\n"); @@ -1813,11 +1838,11 @@ ** tree now contains no keys in the range being deleted. In this case ** break out of the loop. */ bDone = 1; if( lsmTreeCursorValid(&csr) ){ lsmTreeCursorKey(&csr, 0, &pDel, &nDel); - if( db->xCmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0; + if( treeKeycmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0; } if( bDone==0 ){ if( csr.iNode==(p->nHeight-1) ){ /* The element to delete already lies on a leaf node */ @@ -1930,11 +1955,11 @@ int cmp = 0; int rc = LSM_OK; assert( pCsr->iNode>=0 ); p = csrGetKey(pCsr, &pCsr->blob, &rc); if( p ){ - cmp = pCsr->pDb->xCmp(TKV_KEY(p), p->nKey, pKey, nKey); + cmp = treeKeycmp(TKV_KEY(p), p->nKey, pKey, nKey); } return cmp; } #endif @@ -1955,12 +1980,10 @@ */ int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes){ int rc = LSM_OK; /* Return code */ lsm_db *pDb = pCsr->pDb; TreeRoot *pRoot = pCsr->pRoot; - int (*xCmp)(void *, int, void *, int) = pDb->xCmp; - u32 iNodePtr; /* Location of current node in search */ /* Discard any saved position data */ treeCursorRestore(pCsr, 0); @@ -1975,41 +1998,49 @@ int res = 0; /* Result of comparison function */ int iNode = -1; while( iNodePtr ){ TreeNode *pNode; /* Node at location iNodePtr */ int iTest; /* Index of second key to test (0 or 2) */ + u32 iTreeKey; TreeKey *pTreeKey; /* Key to compare against */ - pNode = (TreeNode *)treeShmptr(pDb, iNodePtr); + pNode = (TreeNode *)treeShmptrUnsafe(pDb, iNodePtr); iNode++; pCsr->apTreeNode[iNode] = pNode; /* Compare (pKey/nKey) with the key in the middle slot of B-tree node ** pNode. The middle slot is never empty. If the comparison is a match, ** then the search is finished. Break out of the loop. */ - pTreeKey = treeShmkey(pDb, pNode->aiKeyPtr[1], TKV_LOADKEY, &b, &rc); - if( rc!=LSM_OK ) break; - res = xCmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey); + pTreeKey = treeShmptrUnsafe(pDb, pNode->aiKeyPtr[1]); + if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){ + pTreeKey = treeShmkey(pDb, pNode->aiKeyPtr[1], TKV_LOADKEY, &b, &rc); + if( rc!=LSM_OK ) break; + } + res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey); if( res==0 ){ pCsr->aiCell[iNode] = 1; break; } /* Based on the results of the previous comparison, compare (pKey/nKey) ** to either the left or right key of the B-tree node, if such a key ** exists. */ iTest = (res>0 ? 0 : 2); - pTreeKey = treeShmkey(pDb, pNode->aiKeyPtr[iTest], TKV_LOADKEY, &b, &rc); - if( rc ) break; - if( pTreeKey==0 ){ - iTest = 1; - }else{ - res = xCmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey); + iTreeKey = pNode->aiKeyPtr[iTest]; + if( iTreeKey ){ + pTreeKey = treeShmptrUnsafe(pDb, iTreeKey); + if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){ + pTreeKey = treeShmkey(pDb, iTreeKey, TKV_LOADKEY, &b, &rc); + if( rc ) break; + } + res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey); if( res==0 ){ pCsr->aiCell[iNode] = iTest; break; } + }else{ + iTest = 1; } if( iNode<(pRoot->nHeight-1) ){ iNodePtr = getChildPtr(pNode, pRoot->iTransId, iTest + (res<0)); }else{ @@ -2090,11 +2121,11 @@ } #ifndef NDEBUG if( pCsr->iNode>=0 ){ TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc); - assert( rc || pDb->xCmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)>=0 ); + assert( rc||treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)>=0 ); } tblobFree(pDb, &key1); #endif return rc; @@ -2158,11 +2189,11 @@ } #ifndef NDEBUG if( pCsr->iNode>=0 ){ TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc); - assert( rc || pDb->xCmp(TKV_KEY(pK2), pK2->nKey, TKV_KEY(pK1), pK1->nKey)<0 ); + assert( rc || treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)<0 ); } tblobFree(pDb, &key1); #endif return rc; @@ -2212,15 +2243,15 @@ int lsmTreeCursorFlags(TreeCursor *pCsr){ int flags = 0; if( pCsr && pCsr->iNode>=0 ){ int rc = LSM_OK; - TreeKey *pKey = (TreeKey *)treeShmptr(pCsr->pDb, + TreeKey *pKey = (TreeKey *)treeShmptrUnsafe(pCsr->pDb, pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]] ); assert( rc==LSM_OK ); - flags = pKey->flags; + flags = (pKey->flags & ~LSM_CONTIGUOUS); } return flags; } int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey){ Index: src/vdbe.c ================================================================== --- src/vdbe.c +++ src/vdbe.c @@ -2571,11 +2571,11 @@ break; } /* Opcode: VerifyCookie P1 P2 P3 * * ** -** cHECK THe value of global database parameter number 0 (the +** Check the value of global database parameter number 0 (the ** schema version) and make sure it is equal to P2 and that the ** generation counter on the local schema parse equals P3. ** ** P1 is the database number which is 0 for the main database file ** and 1 for the file holding temporary tables and some higher number