Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Enhance the log messages emitted when a page conflict is detected. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | begin-concurrent |
Files: | files | file ages | folders |
SHA3-256: |
92618492b048867af38922825f3d094e |
User & Date: | dan 2017-05-29 14:27:37.696 |
Context
2017-05-29
| ||
19:23 | Instead of a root page number, log the object (table or index) name if a page level locking conflict is detected. (check-in: 9ad846e57b user: dan tags: begin-concurrent) | |
14:27 | Enhance the log messages emitted when a page conflict is detected. (check-in: 92618492b0 user: dan tags: begin-concurrent) | |
2017-05-26
| ||
18:18 | Adjust the bitvec related sqlite3_log messages added by [9527089b]. (check-in: a7e0e7a483 user: dan tags: begin-concurrent) | |
Changes
Changes to src/btree.c.
︙ | ︙ | |||
2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 | getAndInitPage_error: if( pCur ) pCur->iPage--; testcase( pgno==0 ); assert( pgno!=0 || rc==SQLITE_CORRUPT ); return rc; } /* ** Release a MemPage. This should be called once for each prior ** call to btreeGetPage. */ static void releasePageNotNull(MemPage *pPage){ assert( pPage->aData ); assert( pPage->pBt ); | > > > > > > > > > > > | 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 | getAndInitPage_error: if( pCur ) pCur->iPage--; testcase( pgno==0 ); assert( pgno!=0 || rc==SQLITE_CORRUPT ); return rc; } #ifndef SQLITE_OMIT_CONCURRENT /* ** Set the value of the MemPage.pgnoRoot variable, if it exists. */ static void setMempageRoot(MemPage *pPg, u32 pgnoRoot){ pPg->pgnoRoot = pgnoRoot; } #else # define setMempageRoot(x,y) #endif /* ** Release a MemPage. This should be called once for each prior ** call to btreeGetPage. */ static void releasePageNotNull(MemPage *pPage){ assert( pPage->aData ); assert( pPage->pBt ); |
︙ | ︙ | |||
5218 5219 5220 5221 5222 5223 5224 5225 5226 5227 5228 5229 5230 5231 5232 5233 5234 5235 5236 | ** This function returns SQLITE_CORRUPT if the page-header flags field of ** the new child page does not match the flags field of the parent (i.e. ** if an intkey page appears to be the parent of a non-intkey page, or ** vice-versa). */ static int moveToChild(BtCursor *pCur, u32 newPgno){ BtShared *pBt = pCur->pBt; assert( cursorOwnsBtShared(pCur) ); assert( pCur->eState==CURSOR_VALID ); assert( pCur->iPage<BTCURSOR_MAX_DEPTH ); assert( pCur->iPage>=0 ); if( pCur->iPage>=(BTCURSOR_MAX_DEPTH-1) ){ return SQLITE_CORRUPT_BKPT; } pCur->info.nSize = 0; pCur->curFlags &= ~(BTCF_ValidNKey|BTCF_ValidOvfl); pCur->aiIdx[pCur->iPage++] = pCur->ix; pCur->ix = 0; | > | > > > > | 5229 5230 5231 5232 5233 5234 5235 5236 5237 5238 5239 5240 5241 5242 5243 5244 5245 5246 5247 5248 5249 5250 5251 5252 5253 5254 5255 5256 5257 5258 5259 5260 5261 | ** This function returns SQLITE_CORRUPT if the page-header flags field of ** the new child page does not match the flags field of the parent (i.e. ** if an intkey page appears to be the parent of a non-intkey page, or ** vice-versa). */ static int moveToChild(BtCursor *pCur, u32 newPgno){ BtShared *pBt = pCur->pBt; int rc; assert( cursorOwnsBtShared(pCur) ); assert( pCur->eState==CURSOR_VALID ); assert( pCur->iPage<BTCURSOR_MAX_DEPTH ); assert( pCur->iPage>=0 ); if( pCur->iPage>=(BTCURSOR_MAX_DEPTH-1) ){ return SQLITE_CORRUPT_BKPT; } pCur->info.nSize = 0; pCur->curFlags &= ~(BTCF_ValidNKey|BTCF_ValidOvfl); pCur->aiIdx[pCur->iPage++] = pCur->ix; pCur->ix = 0; rc = getAndInitPage(pBt, newPgno, &pCur->apPage[pCur->iPage], pCur, pCur->curPagerFlags); if( rc==SQLITE_OK ){ setMempageRoot(pCur->apPage[pCur->iPage], pCur->pgnoRoot); } return rc; } #ifdef SQLITE_DEBUG /* ** Page pParent is an internal (non-leaf) tree page. This function ** asserts that page number iChild is the left-child if the iIdx'th ** cell in page pParent. Or, if iIdx is equal to the total number of |
︙ | ︙ | |||
5337 5338 5339 5340 5341 5342 5343 5344 5345 5346 5347 5348 5349 5350 | assert( pCur->iPage==(-1) ); rc = getAndInitPage(pCur->pBtree->pBt, pCur->pgnoRoot, &pCur->apPage[0], 0, pCur->curPagerFlags); if( rc!=SQLITE_OK ){ pCur->eState = CURSOR_INVALID; return rc; } pCur->iPage = 0; pCur->curIntKey = pCur->apPage[0]->intKey; } pRoot = pCur->apPage[0]; assert( pRoot->pgno==pCur->pgnoRoot ); /* If pCur->pKeyInfo is not NULL, then the caller that opened this cursor | > | 5353 5354 5355 5356 5357 5358 5359 5360 5361 5362 5363 5364 5365 5366 5367 | assert( pCur->iPage==(-1) ); rc = getAndInitPage(pCur->pBtree->pBt, pCur->pgnoRoot, &pCur->apPage[0], 0, pCur->curPagerFlags); if( rc!=SQLITE_OK ){ pCur->eState = CURSOR_INVALID; return rc; } setMempageRoot(pCur->apPage[0], pCur->pgnoRoot); pCur->iPage = 0; pCur->curIntKey = pCur->apPage[0]->intKey; } pRoot = pCur->apPage[0]; assert( pRoot->pgno==pCur->pgnoRoot ); /* If pCur->pKeyInfo is not NULL, then the caller that opened this cursor |
︙ | ︙ | |||
7496 7497 7498 7499 7500 7501 7502 | ** SQLITE_NOMEM. */ static int balance_nonroot( MemPage *pParent, /* Parent page of siblings being balanced */ int iParentIdx, /* Index of "the page" in pParent */ u8 *aOvflSpace, /* page-size bytes of space for parent ovfl */ int isRoot, /* True if pParent is a root-page */ | | > | 7513 7514 7515 7516 7517 7518 7519 7520 7521 7522 7523 7524 7525 7526 7527 7528 | ** SQLITE_NOMEM. */ static int balance_nonroot( MemPage *pParent, /* Parent page of siblings being balanced */ int iParentIdx, /* Index of "the page" in pParent */ u8 *aOvflSpace, /* page-size bytes of space for parent ovfl */ int isRoot, /* True if pParent is a root-page */ int bBulk, /* True if this call is part of a bulk load */ Pgno pgnoRoot /* Root page of b-tree being balanced */ ){ BtShared *pBt; /* The whole database */ int nMaxCells = 0; /* Allocated size of apCell, szCell, aFrom. */ int nNew = 0; /* Number of pages in apNew[] */ int nOld; /* Number of pages in apOld[] */ int i, j, k; /* Loop counters */ int nxDiv; /* Next divider slot in pParent->aCell[] */ |
︙ | ︙ | |||
7588 7589 7590 7591 7592 7593 7594 7595 7596 7597 7598 7599 7600 7601 | pgno = get4byte(pRight); while( 1 ){ rc = getAndInitPage(pBt, pgno, &apOld[i], 0, 0); if( rc ){ memset(apOld, 0, (i+1)*sizeof(MemPage*)); goto balance_cleanup; } nMaxCells += 1+apOld[i]->nCell+apOld[i]->nOverflow; if( (i--)==0 ) break; if( pParent->nOverflow && i+nxDiv==pParent->aiOvfl[0] ){ apDiv[i] = pParent->apOvfl[0]; pgno = get4byte(apDiv[i]); szNew[i] = pParent->xCellSize(pParent, apDiv[i]); | > > | 7606 7607 7608 7609 7610 7611 7612 7613 7614 7615 7616 7617 7618 7619 7620 7621 | pgno = get4byte(pRight); while( 1 ){ rc = getAndInitPage(pBt, pgno, &apOld[i], 0, 0); if( rc ){ memset(apOld, 0, (i+1)*sizeof(MemPage*)); goto balance_cleanup; } setMempageRoot(apOld[i], pgnoRoot); nMaxCells += 1+apOld[i]->nCell+apOld[i]->nOverflow; if( (i--)==0 ) break; if( pParent->nOverflow && i+nxDiv==pParent->aiOvfl[0] ){ apDiv[i] = pParent->apOvfl[0]; pgno = get4byte(apDiv[i]); szNew[i] = pParent->xCellSize(pParent, apDiv[i]); |
︙ | ︙ | |||
8396 8397 8398 8399 8400 8401 8402 | ** has completed, it is safe to release the pSpace buffer used by ** the previous call, as the overflow cell data will have been ** copied either into the body of a database page or into the new ** pSpace buffer passed to the latter call to balance_nonroot(). */ u8 *pSpace = sqlite3PageMalloc(pCur->pBt->pageSize); rc = balance_nonroot(pParent, iIdx, pSpace, iPage==1, | | | 8416 8417 8418 8419 8420 8421 8422 8423 8424 8425 8426 8427 8428 8429 8430 | ** has completed, it is safe to release the pSpace buffer used by ** the previous call, as the overflow cell data will have been ** copied either into the body of a database page or into the new ** pSpace buffer passed to the latter call to balance_nonroot(). */ u8 *pSpace = sqlite3PageMalloc(pCur->pBt->pageSize); rc = balance_nonroot(pParent, iIdx, pSpace, iPage==1, pCur->hints&BTREE_BULKLOAD, pCur->pgnoRoot); if( pFree ){ /* If pFree is not NULL, it points to the pSpace buffer used ** by a previous call to balance_nonroot(). Its contents are ** now stored either on real database pages or within the ** new pSpace buffer, so it may be safely freed here. */ sqlite3PageFree(pFree); } |
︙ | ︙ | |||
8994 8995 8996 8997 8998 8999 9000 | ** Erase the given database page and all its children. Return ** the page to the freelist. */ static int clearDatabasePage( BtShared *pBt, /* The BTree that contains the table */ Pgno pgno, /* Page number to clear */ int freePageFlag, /* Deallocate page if true */ | | > > | > | > | 9014 9015 9016 9017 9018 9019 9020 9021 9022 9023 9024 9025 9026 9027 9028 9029 9030 9031 9032 9033 9034 9035 9036 9037 9038 9039 9040 9041 9042 9043 9044 9045 9046 9047 9048 9049 9050 9051 9052 9053 9054 9055 9056 9057 9058 9059 9060 9061 9062 9063 | ** Erase the given database page and all its children. Return ** the page to the freelist. */ static int clearDatabasePage( BtShared *pBt, /* The BTree that contains the table */ Pgno pgno, /* Page number to clear */ int freePageFlag, /* Deallocate page if true */ int *pnChange, /* Add number of Cells freed to this counter */ Pgno pgnoRoot ){ MemPage *pPage; int rc; unsigned char *pCell; int i; int hdr; CellInfo info; assert( sqlite3_mutex_held(pBt->mutex) ); if( pgno>btreePagecount(pBt) ){ return SQLITE_CORRUPT_BKPT; } rc = getAndInitPage(pBt, pgno, &pPage, 0, 0); if( rc ) return rc; setMempageRoot(pPage, pgnoRoot); if( pPage->bBusy ){ rc = SQLITE_CORRUPT_BKPT; goto cleardatabasepage_out; } pPage->bBusy = 1; hdr = pPage->hdrOffset; for(i=0; i<pPage->nCell; i++){ pCell = findCell(pPage, i); if( !pPage->leaf ){ rc = clearDatabasePage(pBt, get4byte(pCell), 1, pnChange, pgnoRoot); if( rc ) goto cleardatabasepage_out; } rc = clearCell(pPage, pCell, &info); if( rc ) goto cleardatabasepage_out; } if( !pPage->leaf ){ rc = clearDatabasePage( pBt, get4byte(&pPage->aData[hdr+8]), 1, pnChange, pgnoRoot ); if( rc ) goto cleardatabasepage_out; }else if( pnChange ){ assert( pPage->intKey || CORRUPT_DB ); testcase( !pPage->intKey ); *pnChange += pPage->nCell; } if( freePageFlag ){ |
︙ | ︙ | |||
9070 9071 9072 9073 9074 9075 9076 | rc = saveAllCursors(pBt, (Pgno)iTable, 0); if( SQLITE_OK==rc ){ /* Invalidate all incrblob cursors open on table iTable (assuming iTable ** is the root of a table b-tree - if it is not, the following call is ** a no-op). */ invalidateIncrblobCursors(p, (Pgno)iTable, 0, 1); | | | 9094 9095 9096 9097 9098 9099 9100 9101 9102 9103 9104 9105 9106 9107 9108 | rc = saveAllCursors(pBt, (Pgno)iTable, 0); if( SQLITE_OK==rc ){ /* Invalidate all incrblob cursors open on table iTable (assuming iTable ** is the root of a table b-tree - if it is not, the following call is ** a no-op). */ invalidateIncrblobCursors(p, (Pgno)iTable, 0, 1); rc = clearDatabasePage(pBt, (Pgno)iTable, 0, pnChange, (Pgno)iTable); } sqlite3BtreeLeave(p); return rc; } /* ** Delete all information from the single table that pCur is open on. |
︙ | ︙ |
Changes to src/btreeInt.h.
︙ | ︙ | |||
297 298 299 300 301 302 303 304 305 306 307 308 309 310 | u8 *aData; /* Pointer to disk image of the page data */ u8 *aDataEnd; /* One byte past the end of usable data */ u8 *aCellIdx; /* The cell index area */ u8 *aDataOfst; /* Same as aData for leaves. aData+4 for interior */ DbPage *pDbPage; /* Pager page handle */ u16 (*xCellSize)(MemPage*,u8*); /* cellSizePtr method */ void (*xParseCell)(MemPage*,u8*,CellInfo*); /* btreeParseCell method */ }; /* ** A linked list of the following structures is stored at BtShared.pLock. ** Locks are added (or upgraded from READ_LOCK to WRITE_LOCK) when a cursor ** is opened on the table with root page BtShared.iTable. Locks are removed ** from this list when a transaction is committed or rolled back, or when | > > > | 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 | u8 *aData; /* Pointer to disk image of the page data */ u8 *aDataEnd; /* One byte past the end of usable data */ u8 *aCellIdx; /* The cell index area */ u8 *aDataOfst; /* Same as aData for leaves. aData+4 for interior */ DbPage *pDbPage; /* Pager page handle */ u16 (*xCellSize)(MemPage*,u8*); /* cellSizePtr method */ void (*xParseCell)(MemPage*,u8*,CellInfo*); /* btreeParseCell method */ #ifndef SQLITE_OMIT_CONCURRENT u32 pgnoRoot; /* Root page of b-tree that this page belongs to */ #endif }; /* ** A linked list of the following structures is stored at BtShared.pLock. ** Locks are added (or upgraded from READ_LOCK to WRITE_LOCK) when a cursor ** is opened on the table with root page BtShared.iTable. Locks are removed ** from this list when a transaction is committed or rolled back, or when |
︙ | ︙ |
Changes to src/wal.c.
︙ | ︙ | |||
239 240 241 242 243 244 245 246 247 248 249 250 251 252 | ** When a rollback occurs, the value of K is decreased. Hash table entries ** that correspond to frames greater than the new K value are removed ** from the hash table at this point. */ #ifndef SQLITE_OMIT_WAL #include "wal.h" /* ** Trace output macros */ #if defined(SQLITE_TEST) && defined(SQLITE_DEBUG) int sqlite3WalTrace = 0; # define WALTRACE(X) if(sqlite3WalTrace) sqlite3DebugPrintf X | > | 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | ** When a rollback occurs, the value of K is decreased. Hash table entries ** that correspond to frames greater than the new K value are removed ** from the hash table at this point. */ #ifndef SQLITE_OMIT_WAL #include "wal.h" #include "btreeInt.h" /* ** Trace output macros */ #if defined(SQLITE_TEST) && defined(SQLITE_DEBUG) int sqlite3WalTrace = 0; # define WALTRACE(X) if(sqlite3WalTrace) sqlite3DebugPrintf X |
︙ | ︙ | |||
2912 2913 2914 2915 2916 2917 2918 | sz = (sz&0xfe00) + ((sz&0x0001)<<16); iOffset = walFrameOffset(i+iZero, sz) + WAL_FRAME_HDRSIZE + 40; rc = sqlite3OsRead(pWal->pWalFd, aNew, sizeof(aNew), iOffset); if( rc==SQLITE_OK && memcmp(aOld, aNew, sizeof(aNew)) ){ rc = SQLITE_BUSY_SNAPSHOT; } }else if( sqlite3BitvecTestNotNull(pAllRead, aPgno[i]) ){ | > > > > > > > > > > | | > > | > > | | > | 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 | sz = (sz&0xfe00) + ((sz&0x0001)<<16); iOffset = walFrameOffset(i+iZero, sz) + WAL_FRAME_HDRSIZE + 40; rc = sqlite3OsRead(pWal->pWalFd, aNew, sizeof(aNew), iOffset); if( rc==SQLITE_OK && memcmp(aOld, aNew, sizeof(aNew)) ){ rc = SQLITE_BUSY_SNAPSHOT; } }else if( sqlite3BitvecTestNotNull(pAllRead, aPgno[i]) ){ PgHdr *pPg = 0; rc = sqlite3PagerGet(pPage1->pPager, aPgno[i], &pPg, 0); if( rc==SQLITE_OK ){ Pgno pgnoRoot = 0; int bWrite = -1; if( pPg ){ pgnoRoot = ((MemPage*)sqlite3PagerGetExtra(pPg))->pgnoRoot; bWrite = sqlite3PagerIswriteable(pPg); sqlite3PagerUnref(pPg); } sqlite3_log(SQLITE_OK, "cannot commit CONCURRENT transaction " "- conflict at page %d " "(%s page; part of b-tree with root page %d)", (int)aPgno[i], (bWrite==0?"read-only":(bWrite>0?"read/write":"unknown")), (int)pgnoRoot ); rc = SQLITE_BUSY_SNAPSHOT; } }else if( (pPg = sqlite3PagerLookup(pPager, aPgno[i])) ){ /* Page aPgno[i], which is present in the pager cache, has been ** modified since the current CONCURRENT transaction was started. ** However it was not read by the current transaction, so is not ** a conflict. There are two possibilities: (a) the page was ** allocated at the of the file by the current transaction or ** (b) was present in the cache at the start of the transaction. |
︙ | ︙ |
Added test/concurrent5.test.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | # 2017 May 26 # # The author disclaims copyright to this source code. In place of # a legal notice, here is a blessing: # # May you do good and not evil. # May you find forgiveness for yourself and forgive others. # May you share freely, never taking more than you give. # #*********************************************************************** # # set testdir [file dirname $argv0] source $testdir/tester.tcl source $testdir/lock_common.tcl source $testdir/wal_common.tcl set ::testprefix concurrent5 ifcapable !concurrent { finish_test return } db close sqlite3_shutdown test_sqlite3_log [list lappend ::log] set ::log [list] sqlite3 db test.db proc do_test_conflict_msg {tn msg} { set msg "cannot commit CONCURRENT transaction - [string trim $msg]" uplevel [list do_test $tn {lindex $::log end} $msg] } do_execsql_test 1.0 { PRAGMA journal_mode = wal; CREATE TABLE t1(x, y); CREATE TABLE t2(c); WITH s(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM s WHERE i<100) INSERT INTO t1 SELECT randomblob(400), randomblob(400) FROM s; } {wal} sqlite3 db2 test.db do_test 1.1.1 { set ::log [list] db2 eval { BEGIN CONCURRENT; SELECT count(*) FROM t1; INSERT INTO t2 VALUES(10); } db eval { INSERT INTO t1 VALUES(randomblob(200), randomblob(200)); } catchsql COMMIT db2 } {1 {database is locked}} do_test_conflict_msg 1.1.2 { conflict at page 2 (read-only page; part of b-tree with root page 2) } do_test 1.2.1 { set ::log [list] db2 eval { ROLLBACK; BEGIN CONCURRENT; INSERT INTO t1 VALUES(11, 12); } db eval { INSERT INTO t1 VALUES(12, 11); } catchsql COMMIT db2 } {1 {database is locked}} do_test_conflict_msg 1.2.2 { conflict at page 105 (read/write page; part of b-tree with root page 2) } do_test 1.3.1 { set ::log [list] db2 eval { ROLLBACK; BEGIN CONCURRENT; INSERT INTO t2 VALUES('x'); } db eval { INSERT INTO t2 VALUES('y'); } catchsql COMMIT db2 } {1 {database is locked}} do_test_conflict_msg 1.3.2 { conflict at page 3 (read/write page; part of b-tree with root page 3) } db close db2 close sqlite3_shutdown test_sqlite3_log sqlite3_initialize finish_test |