Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
| SHA1 Hash: | 1b21fb4494efa5d14fc33ef7ecebfb957084d702 |
|---|---|
| Date: | 2012-11-17 13:17:25 |
| User: | dan |
| Comment: | Fix another out-of-order writes problem. |
Tags And Properties
- branch=trunk inherited from [84d5dea8fd]
- sym-trunk inherited from [84d5dea8fd]
Changes
Changes to src/lsmInt.h
686 int lsmFsTruncateLog(FileSystem *pFS, i64 nByte); 686 int lsmFsTruncateLog(FileSystem *pFS, i64 nByte); 687 int lsmFsCloseAndDeleteLog(FileSystem *pFS); 687 int lsmFsCloseAndDeleteLog(FileSystem *pFS); 688 688 689 void lsmFsDeferClose(FileSystem *pFS, LsmFile **pp); 689 void lsmFsDeferClose(FileSystem *pFS, LsmFile **pp); 690 690 691 /* And to sync the db file */ 691 /* And to sync the db file */ 692 int lsmFsSyncDb(FileSystem *); 692 int lsmFsSyncDb(FileSystem *); > 693 > 694 void lsmFsFlushWaiting(FileSystem *, int *); 693 695 694 /* Used by lsm_info(ARRAY_STRUCTURE) and lsm_config(MMAP) */ 696 /* Used by lsm_info(ARRAY_STRUCTURE) and lsm_config(MMAP) */ 695 int lsmInfoArrayStructure(lsm_db *pDb, Pgno iFirst, char **pzOut); 697 int lsmInfoArrayStructure(lsm_db *pDb, Pgno iFirst, char **pzOut); 696 int lsmInfoArrayPages(lsm_db *pDb, Pgno iFirst, char **pzOut); 698 int lsmInfoArrayPages(lsm_db *pDb, Pgno iFirst, char **pzOut); 697 int lsmConfigMmap(lsm_db *pDb, int *piParam); 699 int lsmConfigMmap(lsm_db *pDb, int *piParam); 698 700 699 int lsmEnvOpen(lsm_env *, const char *, lsm_file **); 701 int lsmEnvOpen(lsm_env *, const char *, lsm_file **);
Changes to src/lsm_file.c
195 195 196 /* mmap() mode things */ 196 /* mmap() mode things */ 197 int bUseMmap; /* True to use mmap() to access db file */ 197 int bUseMmap; /* True to use mmap() to access db file */ 198 void *pMap; /* Current mapping of database file */ 198 void *pMap; /* Current mapping of database file */ 199 i64 nMap; /* Bytes mapped at pMap */ 199 i64 nMap; /* Bytes mapped at pMap */ 200 Page *pFree; 200 Page *pFree; 201 201 > 202 Page *pWaiting; /* b-tree pages waiting to be written */ > 203 202 /* Statistics */ 204 /* Statistics */ 203 int nWrite; /* Total number of pages written */ 205 int nWrite; /* Total number of pages written */ 204 int nRead; /* Total number of pages read */ 206 int nRead; /* Total number of pages read */ 205 207 206 /* Page cache parameters for non-mmap() mode */ 208 /* Page cache parameters for non-mmap() mode */ 207 int nOut; /* Number of outstanding pages */ 209 int nOut; /* Number of outstanding pages */ 208 int nCacheMax; /* Configured cache size (in pages) */ 210 int nCacheMax; /* Configured cache size (in pages) */ ................................................................................................................................................................................ 237 Page *pLruPrev; /* Previous page in LRU list */ 239 Page *pLruPrev; /* Previous page in LRU list */ 238 FileSystem *pFS; /* File system that owns this page */ 240 FileSystem *pFS; /* File system that owns this page */ 239 241 240 /* Only used in compressed database mode: */ 242 /* Only used in compressed database mode: */ 241 int nCompress; /* Compressed size (or 0 for uncomp. db) */ 243 int nCompress; /* Compressed size (or 0 for uncomp. db) */ 242 int nCompressPrev; /* Compressed size of prev page */ 244 int nCompressPrev; /* Compressed size of prev page */ 243 Segment *pSeg; /* Segment this page will be written to */ 245 Segment *pSeg; /* Segment this page will be written to */ > 246 > 247 /* Fix this up somehow */ > 248 Page *pNextWaiting; 244 }; 249 }; 245 250 246 /* 251 /* 247 ** Meta-data page handle. There are two meta-data pages at the start of 252 ** Meta-data page handle. There are two meta-data pages at the start of 248 ** the database file, each FileSystem.nMetasize bytes in size. 253 ** the database file, each FileSystem.nMetasize bytes in size. 249 */ 254 */ 250 struct MetaPage { 255 struct MetaPage { ................................................................................................................................................................................ 1581 1586 1582 /* 1587 /* 1583 ** Mark the sorted run passed as the second argument as finished. 1588 ** Mark the sorted run passed as the second argument as finished. 1584 */ 1589 */ 1585 int lsmFsSortedFinish(FileSystem *pFS, Segment *p){ 1590 int lsmFsSortedFinish(FileSystem *pFS, Segment *p){ 1586 int rc = LSM_OK; 1591 int rc = LSM_OK; 1587 if( p && p->iLastPg ){ 1592 if( p && p->iLastPg ){ 1588 int iBlk; < 1589 1593 1590 /* Check if the last page of this run happens to be the last of a block. 1594 /* Check if the last page of this run happens to be the last of a block. 1591 ** If it is, then an extra block has already been allocated for this run. 1595 ** If it is, then an extra block has already been allocated for this run. 1592 ** Shift this extra block back to the free-block list. 1596 ** Shift this extra block back to the free-block list. 1593 ** 1597 ** 1594 ** Otherwise, add the first free page in the last block used by the run 1598 ** Otherwise, add the first free page in the last block used by the run 1595 ** to the lAppend list. 1599 ** to the lAppend list. 1596 */ 1600 */ 1597 iBlk = fsPageToBlock(pFS, p->iLastPg); < 1598 if( fsLastPageOnPagesBlock(pFS, p->iLastPg)!=p->iLastPg ){ 1601 if( fsLastPageOnPagesBlock(pFS, p->iLastPg)!=p->iLastPg ){ 1599 int i; 1602 int i; 1600 Pgno *aiAppend = pFS->pDb->pWorker->aiAppend; 1603 Pgno *aiAppend = pFS->pDb->pWorker->aiAppend; 1601 for(i=0; i<LSM_APPLIST_SZ; i++){ 1604 for(i=0; i<LSM_APPLIST_SZ; i++){ 1602 if( aiAppend[i]==0 ){ 1605 if( aiAppend[i]==0 ){ 1603 aiAppend[i] = p->iLastPg+1; 1606 aiAppend[i] = p->iLastPg+1; 1604 break; 1607 break; ................................................................................................................................................................................ 1902 } 1905 } 1903 } 1906 } 1904 1907 1905 pSeg->nSize++; 1908 pSeg->nSize++; 1906 pSeg->iLastPg = *piNew; 1909 pSeg->iLastPg = *piNew; 1907 return LSM_OK; 1910 return LSM_OK; 1908 } 1911 } > 1912 > 1913 void lsmFsFlushWaiting(FileSystem *pFS, int *pRc){ > 1914 int rc = *pRc; > 1915 Page *pPg; > 1916 > 1917 pPg = pFS->pWaiting; > 1918 pFS->pWaiting = 0; > 1919 > 1920 while( pPg ){ > 1921 Page *pNext = pPg->pNextWaiting; > 1922 if( rc==LSM_OK ) rc = lsmFsPagePersist(pPg); > 1923 assert( pPg->nRef==1 ); > 1924 lsmFsPageRelease(pPg); > 1925 pPg = pNext; > 1926 } > 1927 *pRc = rc; > 1928 } 1909 1929 1910 /* 1930 /* 1911 ** If the page passed as an argument is dirty, update the database file 1931 ** If the page passed as an argument is dirty, update the database file 1912 ** (or mapping of the database file) with its current contents and mark 1932 ** (or mapping of the database file) with its current contents and mark 1913 ** the page as clean. 1933 ** the page as clean. 1914 ** 1934 ** 1915 ** Return LSM_OK if the operation is a success, or an LSM error code 1935 ** Return LSM_OK if the operation is a success, or an LSM error code ................................................................................................................................................................................ 1940 iHash = fsHashKey(pFS->nHash, pPg->iPg); 1960 iHash = fsHashKey(pFS->nHash, pPg->iPg); 1941 pPg->pHashNext = pFS->apHash[iHash]; 1961 pPg->pHashNext = pFS->apHash[iHash]; 1942 pFS->apHash[iHash] = pPg; 1962 pFS->apHash[iHash] = pPg; 1943 1963 1944 pPg->pSeg->nSize += (sizeof(aSz) * 2) + pPg->nCompress; 1964 pPg->pSeg->nSize += (sizeof(aSz) * 2) + pPg->nCompress; 1945 1965 1946 }else{ 1966 }else{ 1947 i64 iOff; /* Offset to write within database file */ < 1948 1967 1949 if( pPg->iPg==0 ){ 1968 if( pPg->iPg==0 ){ 1950 /* No page number has been assigned yet. This occurs with pages used 1969 /* No page number has been assigned yet. This occurs with pages used 1951 ** in the b-tree hierarchy. */ | 1970 ** in the b-tree hierarchy. They were not assigned page numbers when > 1971 ** they were created as doing so would cause this call to > 1972 ** lsmFsPagePersist() to write an out-of-order page. Instead a page > 1973 ** number is assigned here so that the page data will be appended > 1974 ** to the current segment. > 1975 */ > 1976 Page **pp; 1952 int iPrev = 0; 1977 int iPrev = 0; 1953 int iNext = 0; 1978 int iNext = 0; 1954 int iHash; 1979 int iHash; 1955 1980 1956 assert( pPg->pSeg->iFirst ); 1981 assert( pPg->pSeg->iFirst ); 1957 assert( pPg->flags & PAGE_FREE ); 1982 assert( pPg->flags & PAGE_FREE ); 1958 assert( (pPg->flags & PAGE_HASPREV)==0 ); 1983 assert( (pPg->flags & PAGE_HASPREV)==0 ); ................................................................................................................................................................................ 1975 assert( iPrev==0 ); 2000 assert( iPrev==0 ); 1976 lsmPutU32(&pPg->aData[pPg->nData], iNext); 2001 lsmPutU32(&pPg->aData[pPg->nData], iNext); 1977 }else{ 2002 }else{ 1978 int nData = pPg->nData; 2003 int nData = pPg->nData; 1979 pPg->nData += 4; 2004 pPg->nData += 4; 1980 lsmSortedExpandBtreePage(pPg, nData); 2005 lsmSortedExpandBtreePage(pPg, nData); 1981 } 2006 } 1982 } | 2007 > 2008 pPg->nRef++; > 2009 for(pp=&pFS->pWaiting; *pp; pp=&(*pp)->pNextWaiting); > 2010 *pp = pPg; > 2011 assert( pPg->pNextWaiting==0 ); 1983 2012 > 2013 }else{ > 2014 i64 iOff; /* Offset to write within database file */ > 2015 1984 iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1); | 2016 iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1); 1985 if( pFS->bUseMmap==0 ){ | 2017 if( pFS->bUseMmap==0 ){ 1986 u8 *aData = pPg->aData - (pPg->flags & PAGE_HASPREV); | 2018 u8 *aData = pPg->aData - (pPg->flags & PAGE_HASPREV); 1987 rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, aData, pFS->nPagesize); | 2019 rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, aData, pFS->nPagesize); 1988 }else if( pPg->flags & PAGE_FREE ){ | 2020 }else if( pPg->flags & PAGE_FREE ){ 1989 fsGrowMapping(pFS, iOff + pFS->nPagesize, &rc); | 2021 fsGrowMapping(pFS, iOff + pFS->nPagesize, &rc); 1990 if( rc==LSM_OK ){ | 2022 if( rc==LSM_OK ){ 1991 u8 *aTo = &((u8 *)(pFS->pMap))[iOff]; | 2023 u8 *aTo = &((u8 *)(pFS->pMap))[iOff]; 1992 u8 *aFrom = pPg->aData - (pPg->flags & PAGE_HASPREV); | 2024 u8 *aFrom = pPg->aData - (pPg->flags & PAGE_HASPREV); 1993 memcpy(aTo, aFrom, pFS->nPagesize); | 2025 memcpy(aTo, aFrom, pFS->nPagesize); 1994 lsmFree(pFS->pEnv, aFrom); | 2026 lsmFree(pFS->pEnv, aFrom); 1995 pPg->aData = aTo + (pPg->flags & PAGE_HASPREV); | 2027 pPg->aData = aTo + (pPg->flags & PAGE_HASPREV); 1996 pPg->flags &= ~PAGE_FREE; | 2028 pPg->flags &= ~PAGE_FREE; 1997 fsPageAddToLru(pFS, pPg); | 2029 fsPageAddToLru(pFS, pPg); 1998 } | 2030 } 1999 } | 2031 } 2000 } | 2032 > 2033 lsmFsFlushWaiting(pFS, &rc); 2001 pPg->flags &= ~PAGE_DIRTY; | 2034 pPg->flags &= ~PAGE_DIRTY; 2002 pFS->nWrite++; | 2035 pFS->nWrite++; > 2036 } > 2037 } 2003 } 2038 } 2004 2039 2005 return rc; 2040 return rc; 2006 } 2041 } 2007 2042 2008 /* 2043 /* 2009 ** For non-compressed databases, this function is a no-op. For compressed 2044 ** For non-compressed databases, this function is a no-op. For compressed ................................................................................................................................................................................ 2075 ** Release a page-reference obtained using fsPageGet(). 2110 ** Release a page-reference obtained using fsPageGet(). 2076 */ 2111 */ 2077 int lsmFsPageRelease(Page *pPg){ 2112 int lsmFsPageRelease(Page *pPg){ 2078 int rc = LSM_OK; 2113 int rc = LSM_OK; 2079 if( pPg ){ 2114 if( pPg ){ 2080 assert( pPg->nRef>0 ); 2115 assert( pPg->nRef>0 ); 2081 pPg->nRef--; 2116 pPg->nRef--; 2082 if( pPg->nRef==0 && pPg->iPg!=0 ){ | 2117 if( pPg->nRef==0 ){ 2083 FileSystem *pFS = pPg->pFS; 2118 FileSystem *pFS = pPg->pFS; 2084 rc = lsmFsPagePersist(pPg); 2119 rc = lsmFsPagePersist(pPg); 2085 pFS->nOut--; 2120 pFS->nOut--; 2086 2121 2087 assert( pPg->pFS->pCompress 2122 assert( pPg->pFS->pCompress 2088 || fsIsFirst(pPg->pFS, pPg->iPg)==0 2123 || fsIsFirst(pPg->pFS, pPg->iPg)==0 2089 || (pPg->flags & PAGE_HASPREV) 2124 || (pPg->flags & PAGE_HASPREV)
Changes to src/lsm_sorted.c
3404 int iLevel; /* Level of b-tree hierachy to write to */ 3404 int iLevel; /* Level of b-tree hierachy to write to */ 3405 int nData; /* Size of aData[] in bytes */ 3405 int nData; /* Size of aData[] in bytes */ 3406 u8 *aData; /* Page data for level iLevel */ 3406 u8 *aData; /* Page data for level iLevel */ 3407 int iOff; /* Offset on b-tree page to write record to */ 3407 int iOff; /* Offset on b-tree page to write record to */ 3408 int nRec; /* Initial number of records on b-tree page */ 3408 int nRec; /* Initial number of records on b-tree page */ 3409 Pgno iPtr; /* Pointer value to accompany pKey/nKey */ 3409 Pgno iPtr; /* Pointer value to accompany pKey/nKey */ 3410 3410 3411 Hierarchy *p; < 3412 Segment *pSeg; < 3413 < 3414 /* If there exists a b-tree hierarchy and it is not loaded into < 3415 ** memory, load it now. */ < 3416 pSeg = &pMW->pLevel->lhs; < 3417 p = &pMW->hier; < 3418 < 3419 assert( pMW->aSave[0].bStore==0 ); 3411 assert( pMW->aSave[0].bStore==0 ); 3420 assert( pMW->aSave[1].bStore==0 ); 3412 assert( pMW->aSave[1].bStore==0 ); 3421 rc = mergeWorkerBtreeIndirect(pMW); 3413 rc = mergeWorkerBtreeIndirect(pMW); 3422 3414 3423 /* Obtain the absolute pointer value to store along with the key in the 3415 /* Obtain the absolute pointer value to store along with the key in the 3424 ** page body. This pointer points to a page that contains keys that are 3416 ** page body. This pointer points to a page that contains keys that are 3425 ** smaller than pKey/nKey. */ 3417 ** smaller than pKey/nKey. */ ................................................................................................................................................................................ 3838 lsmMCursorClose(pCsr); 3830 lsmMCursorClose(pCsr); 3839 3831 3840 /* Persist and release the output page. */ 3832 /* Persist and release the output page. */ 3841 if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW); 3833 if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW); 3842 if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW); 3834 if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW); 3843 if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW); 3835 if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW); 3844 if( rc==LSM_OK ) rc = mergeWorkerAddPadding(pMW); 3836 if( rc==LSM_OK ) rc = mergeWorkerAddPadding(pMW); > 3837 lsmFsFlushWaiting(pMW->pDb->pFS, &rc); 3845 mergeWorkerReleaseAll(pMW); 3838 mergeWorkerReleaseAll(pMW); 3846 3839 3847 lsmFree(pMW->pDb->pEnv, pMW->aGobble); 3840 lsmFree(pMW->pDb->pEnv, pMW->aGobble); 3848 pMW->aGobble = 0; 3841 pMW->aGobble = 0; 3849 pMW->pCsr = 0; 3842 pMW->pCsr = 0; 3850 3843 3851 *pRc = rc; 3844 *pRc = rc; ................................................................................................................................................................................ 4145 assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 ); 4138 assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 ); 4146 lsmDbSnapshotSetLevel(pDb->pWorker, pNext); 4139 lsmDbSnapshotSetLevel(pDb->pWorker, pNext); 4147 sortedFreeLevel(pDb->pEnv, pNew); 4140 sortedFreeLevel(pDb->pEnv, pNew); 4148 }else{ 4141 }else{ 4149 if( pDel ) pDel->iRoot = 0; 4142 if( pDel ) pDel->iRoot = 0; 4150 4143 4151 #if 0 4144 #if 0 4152 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel"); | 4145 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 1, "new-toplevel"); 4153 #endif 4146 #endif 4154 4147 4155 if( freelist.nEntry ){ 4148 if( freelist.nEntry ){ 4156 Freelist *p = &pDb->pWorker->freelist; 4149 Freelist *p = &pDb->pWorker->freelist; 4157 lsmFree(pDb->pEnv, p->aEntry); 4150 lsmFree(pDb->pEnv, p->aEntry); 4158 memcpy(p, &freelist, sizeof(freelist)); 4151 memcpy(p, &freelist, sizeof(freelist)); 4159 freelist.aEntry = 0; 4152 freelist.aEntry = 0; ................................................................................................................................................................................ 4584 /* Clean up the MergeWorker object initialized above. If no error 4577 /* Clean up the MergeWorker object initialized above. If no error 4585 ** has occurred, invoke the work-hook to inform the application that 4578 ** has occurred, invoke the work-hook to inform the application that 4586 ** the database structure has changed. */ 4579 ** the database structure has changed. */ 4587 mergeWorkerShutdown(&mergeworker, &rc); 4580 mergeWorkerShutdown(&mergeworker, &rc); 4588 if( rc==LSM_OK ) sortedInvokeWorkHook(pDb); 4581 if( rc==LSM_OK ) sortedInvokeWorkHook(pDb); 4589 4582 4590 #if 0 4583 #if 0 4591 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work"); | 4584 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 1, "work"); 4592 #endif 4585 #endif 4593 assertBtreeOk(pDb, &pLevel->lhs); 4586 assertBtreeOk(pDb, &pLevel->lhs); 4594 assertRunInOrder(pDb, &pLevel->lhs); 4587 assertRunInOrder(pDb, &pLevel->lhs); 4595 4588 4596 /* If bFlush is true and the database is no longer considered "full", 4589 /* If bFlush is true and the database is no longer considered "full", 4597 ** break out of the loop even if nRemaining is still greater than 4590 ** break out of the loop even if nRemaining is still greater than 4598 ** zero. The caller has an in-memory tree to flush to disk. */ 4591 ** zero. The caller has an in-memory tree to flush to disk. */ ................................................................................................................................................................................ 5129 5122 5130 aData = fsPageData(pPg, &nData); 5123 aData = fsPageData(pPg, &nData); 5131 nRec = pageGetNRec(aData, nData); 5124 nRec = pageGetNRec(aData, nData); 5132 iPtr = pageGetPtr(aData, nData); 5125 iPtr = pageGetPtr(aData, nData); 5133 flags = pageGetFlags(aData, nData); 5126 flags = pageGetFlags(aData, nData); 5134 5127 5135 lsmStringInit(&str, pDb->pEnv); 5128 lsmStringInit(&str, pDb->pEnv); 5136 lsmStringAppendf(&str, "Page : %d (%d bytes)\n", iPg, nData); | 5129 lsmStringAppendf(&str, "Page : %lld (%d bytes)\n", iPg, nData); 5137 lsmStringAppendf(&str, "nRec : %d\n", nRec); 5130 lsmStringAppendf(&str, "nRec : %d\n", nRec); 5138 lsmStringAppendf(&str, "iPtr : %d\n", iPtr); 5131 lsmStringAppendf(&str, "iPtr : %d\n", iPtr); 5139 lsmStringAppendf(&str, "flags: %04x\n", flags); 5132 lsmStringAppendf(&str, "flags: %04x\n", flags); 5140 lsmStringAppendf(&str, "\n"); 5133 lsmStringAppendf(&str, "\n"); 5141 5134 5142 for(iCell=0; iCell<nRec; iCell++){ 5135 for(iCell=0; iCell<nRec; iCell++){ 5143 int nKey; 5136 int nKey;