SQLite

Check-in [92618492b0]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Enhance the log messages emitted when a page conflict is detected.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | begin-concurrent
Files: files | file ages | folders
SHA3-256: 92618492b048867af38922825f3d094eaaa2dd919b1ed2f7372483cc53f892bf
User & Date: dan 2017-05-29 14:27:37.696
Context
2017-05-29
19:23
Instead of a root page number, log the object (table or index) name if a page level locking conflict is detected. (check-in: 9ad846e57b user: dan tags: begin-concurrent)
14:27
Enhance the log messages emitted when a page conflict is detected. (check-in: 92618492b0 user: dan tags: begin-concurrent)
2017-05-26
18:18
Adjust the bitvec related sqlite3_log messages added by [9527089b]. (check-in: a7e0e7a483 user: dan tags: begin-concurrent)
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/btree.c.
2285
2286
2287
2288
2289
2290
2291











2292
2293
2294
2295
2296
2297
2298
getAndInitPage_error:
  if( pCur ) pCur->iPage--;
  testcase( pgno==0 );
  assert( pgno!=0 || rc==SQLITE_CORRUPT );
  return rc;
}












/*
** Release a MemPage.  This should be called once for each prior
** call to btreeGetPage.
*/
static void releasePageNotNull(MemPage *pPage){
  assert( pPage->aData );
  assert( pPage->pBt );







>
>
>
>
>
>
>
>
>
>
>







2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
getAndInitPage_error:
  if( pCur ) pCur->iPage--;
  testcase( pgno==0 );
  assert( pgno!=0 || rc==SQLITE_CORRUPT );
  return rc;
}

#ifndef SQLITE_OMIT_CONCURRENT
/* 
** Set the value of the MemPage.pgnoRoot variable, if it exists.
*/
static void setMempageRoot(MemPage *pPg, u32 pgnoRoot){
  pPg->pgnoRoot = pgnoRoot;
}
#else
# define setMempageRoot(x,y)
#endif

/*
** Release a MemPage.  This should be called once for each prior
** call to btreeGetPage.
*/
static void releasePageNotNull(MemPage *pPage){
  assert( pPage->aData );
  assert( pPage->pBt );
5218
5219
5220
5221
5222
5223
5224

5225
5226
5227
5228
5229
5230
5231
5232
5233
5234
5235
5236
5237
5238




5239
5240
5241
5242
5243
5244
5245
** This function returns SQLITE_CORRUPT if the page-header flags field of
** the new child page does not match the flags field of the parent (i.e.
** if an intkey page appears to be the parent of a non-intkey page, or
** vice-versa).
*/
static int moveToChild(BtCursor *pCur, u32 newPgno){
  BtShared *pBt = pCur->pBt;


  assert( cursorOwnsBtShared(pCur) );
  assert( pCur->eState==CURSOR_VALID );
  assert( pCur->iPage<BTCURSOR_MAX_DEPTH );
  assert( pCur->iPage>=0 );
  if( pCur->iPage>=(BTCURSOR_MAX_DEPTH-1) ){
    return SQLITE_CORRUPT_BKPT;
  }
  pCur->info.nSize = 0;
  pCur->curFlags &= ~(BTCF_ValidNKey|BTCF_ValidOvfl);
  pCur->aiIdx[pCur->iPage++] = pCur->ix;
  pCur->ix = 0;
  return getAndInitPage(pBt, newPgno, &pCur->apPage[pCur->iPage],
                        pCur, pCur->curPagerFlags);




}

#ifdef SQLITE_DEBUG
/*
** Page pParent is an internal (non-leaf) tree page. This function 
** asserts that page number iChild is the left-child if the iIdx'th
** cell in page pParent. Or, if iIdx is equal to the total number of







>












|

>
>
>
>







5229
5230
5231
5232
5233
5234
5235
5236
5237
5238
5239
5240
5241
5242
5243
5244
5245
5246
5247
5248
5249
5250
5251
5252
5253
5254
5255
5256
5257
5258
5259
5260
5261
** This function returns SQLITE_CORRUPT if the page-header flags field of
** the new child page does not match the flags field of the parent (i.e.
** if an intkey page appears to be the parent of a non-intkey page, or
** vice-versa).
*/
static int moveToChild(BtCursor *pCur, u32 newPgno){
  BtShared *pBt = pCur->pBt;
  int rc;

  assert( cursorOwnsBtShared(pCur) );
  assert( pCur->eState==CURSOR_VALID );
  assert( pCur->iPage<BTCURSOR_MAX_DEPTH );
  assert( pCur->iPage>=0 );
  if( pCur->iPage>=(BTCURSOR_MAX_DEPTH-1) ){
    return SQLITE_CORRUPT_BKPT;
  }
  pCur->info.nSize = 0;
  pCur->curFlags &= ~(BTCF_ValidNKey|BTCF_ValidOvfl);
  pCur->aiIdx[pCur->iPage++] = pCur->ix;
  pCur->ix = 0;
  rc = getAndInitPage(pBt, newPgno, &pCur->apPage[pCur->iPage],
                        pCur, pCur->curPagerFlags);
  if( rc==SQLITE_OK ){
    setMempageRoot(pCur->apPage[pCur->iPage], pCur->pgnoRoot);
  }
  return rc;
}

#ifdef SQLITE_DEBUG
/*
** Page pParent is an internal (non-leaf) tree page. This function 
** asserts that page number iChild is the left-child if the iIdx'th
** cell in page pParent. Or, if iIdx is equal to the total number of
5337
5338
5339
5340
5341
5342
5343

5344
5345
5346
5347
5348
5349
5350
    assert( pCur->iPage==(-1) );
    rc = getAndInitPage(pCur->pBtree->pBt, pCur->pgnoRoot, &pCur->apPage[0],
                        0, pCur->curPagerFlags);
    if( rc!=SQLITE_OK ){
      pCur->eState = CURSOR_INVALID;
       return rc;
    }

    pCur->iPage = 0;
    pCur->curIntKey = pCur->apPage[0]->intKey;
  }
  pRoot = pCur->apPage[0];
  assert( pRoot->pgno==pCur->pgnoRoot );

  /* If pCur->pKeyInfo is not NULL, then the caller that opened this cursor







>







5353
5354
5355
5356
5357
5358
5359
5360
5361
5362
5363
5364
5365
5366
5367
    assert( pCur->iPage==(-1) );
    rc = getAndInitPage(pCur->pBtree->pBt, pCur->pgnoRoot, &pCur->apPage[0],
                        0, pCur->curPagerFlags);
    if( rc!=SQLITE_OK ){
      pCur->eState = CURSOR_INVALID;
       return rc;
    }
    setMempageRoot(pCur->apPage[0], pCur->pgnoRoot);
    pCur->iPage = 0;
    pCur->curIntKey = pCur->apPage[0]->intKey;
  }
  pRoot = pCur->apPage[0];
  assert( pRoot->pgno==pCur->pgnoRoot );

  /* If pCur->pKeyInfo is not NULL, then the caller that opened this cursor
7496
7497
7498
7499
7500
7501
7502
7503

7504
7505
7506
7507
7508
7509
7510
** SQLITE_NOMEM.
*/
static int balance_nonroot(
  MemPage *pParent,               /* Parent page of siblings being balanced */
  int iParentIdx,                 /* Index of "the page" in pParent */
  u8 *aOvflSpace,                 /* page-size bytes of space for parent ovfl */
  int isRoot,                     /* True if pParent is a root-page */
  int bBulk                       /* True if this call is part of a bulk load */

){
  BtShared *pBt;               /* The whole database */
  int nMaxCells = 0;           /* Allocated size of apCell, szCell, aFrom. */
  int nNew = 0;                /* Number of pages in apNew[] */
  int nOld;                    /* Number of pages in apOld[] */
  int i, j, k;                 /* Loop counters */
  int nxDiv;                   /* Next divider slot in pParent->aCell[] */







|
>







7513
7514
7515
7516
7517
7518
7519
7520
7521
7522
7523
7524
7525
7526
7527
7528
** SQLITE_NOMEM.
*/
static int balance_nonroot(
  MemPage *pParent,               /* Parent page of siblings being balanced */
  int iParentIdx,                 /* Index of "the page" in pParent */
  u8 *aOvflSpace,                 /* page-size bytes of space for parent ovfl */
  int isRoot,                     /* True if pParent is a root-page */
  int bBulk,                      /* True if this call is part of a bulk load */
  Pgno pgnoRoot                   /* Root page of b-tree being balanced */
){
  BtShared *pBt;               /* The whole database */
  int nMaxCells = 0;           /* Allocated size of apCell, szCell, aFrom. */
  int nNew = 0;                /* Number of pages in apNew[] */
  int nOld;                    /* Number of pages in apOld[] */
  int i, j, k;                 /* Loop counters */
  int nxDiv;                   /* Next divider slot in pParent->aCell[] */
7588
7589
7590
7591
7592
7593
7594


7595
7596
7597
7598
7599
7600
7601
  pgno = get4byte(pRight);
  while( 1 ){
    rc = getAndInitPage(pBt, pgno, &apOld[i], 0, 0);
    if( rc ){
      memset(apOld, 0, (i+1)*sizeof(MemPage*));
      goto balance_cleanup;
    }


    nMaxCells += 1+apOld[i]->nCell+apOld[i]->nOverflow;
    if( (i--)==0 ) break;

    if( pParent->nOverflow && i+nxDiv==pParent->aiOvfl[0] ){
      apDiv[i] = pParent->apOvfl[0];
      pgno = get4byte(apDiv[i]);
      szNew[i] = pParent->xCellSize(pParent, apDiv[i]);







>
>







7606
7607
7608
7609
7610
7611
7612
7613
7614
7615
7616
7617
7618
7619
7620
7621
  pgno = get4byte(pRight);
  while( 1 ){
    rc = getAndInitPage(pBt, pgno, &apOld[i], 0, 0);
    if( rc ){
      memset(apOld, 0, (i+1)*sizeof(MemPage*));
      goto balance_cleanup;
    }
    setMempageRoot(apOld[i], pgnoRoot);

    nMaxCells += 1+apOld[i]->nCell+apOld[i]->nOverflow;
    if( (i--)==0 ) break;

    if( pParent->nOverflow && i+nxDiv==pParent->aiOvfl[0] ){
      apDiv[i] = pParent->apOvfl[0];
      pgno = get4byte(apDiv[i]);
      szNew[i] = pParent->xCellSize(pParent, apDiv[i]);
8396
8397
8398
8399
8400
8401
8402
8403
8404
8405
8406
8407
8408
8409
8410
          ** has completed, it is safe to release the pSpace buffer used by
          ** the previous call, as the overflow cell data will have been 
          ** copied either into the body of a database page or into the new
          ** pSpace buffer passed to the latter call to balance_nonroot().
          */
          u8 *pSpace = sqlite3PageMalloc(pCur->pBt->pageSize);
          rc = balance_nonroot(pParent, iIdx, pSpace, iPage==1,
                               pCur->hints&BTREE_BULKLOAD);
          if( pFree ){
            /* If pFree is not NULL, it points to the pSpace buffer used 
            ** by a previous call to balance_nonroot(). Its contents are
            ** now stored either on real database pages or within the 
            ** new pSpace buffer, so it may be safely freed here. */
            sqlite3PageFree(pFree);
          }







|







8416
8417
8418
8419
8420
8421
8422
8423
8424
8425
8426
8427
8428
8429
8430
          ** has completed, it is safe to release the pSpace buffer used by
          ** the previous call, as the overflow cell data will have been 
          ** copied either into the body of a database page or into the new
          ** pSpace buffer passed to the latter call to balance_nonroot().
          */
          u8 *pSpace = sqlite3PageMalloc(pCur->pBt->pageSize);
          rc = balance_nonroot(pParent, iIdx, pSpace, iPage==1,
                               pCur->hints&BTREE_BULKLOAD, pCur->pgnoRoot);
          if( pFree ){
            /* If pFree is not NULL, it points to the pSpace buffer used 
            ** by a previous call to balance_nonroot(). Its contents are
            ** now stored either on real database pages or within the 
            ** new pSpace buffer, so it may be safely freed here. */
            sqlite3PageFree(pFree);
          }
8994
8995
8996
8997
8998
8999
9000
9001

9002
9003
9004
9005
9006
9007
9008
9009
9010
9011
9012
9013
9014
9015

9016
9017
9018
9019
9020
9021
9022
9023
9024
9025
9026
9027
9028
9029
9030
9031

9032

9033
9034
9035
9036
9037
9038
9039
** Erase the given database page and all its children.  Return
** the page to the freelist.
*/
static int clearDatabasePage(
  BtShared *pBt,           /* The BTree that contains the table */
  Pgno pgno,               /* Page number to clear */
  int freePageFlag,        /* Deallocate page if true */
  int *pnChange            /* Add number of Cells freed to this counter */

){
  MemPage *pPage;
  int rc;
  unsigned char *pCell;
  int i;
  int hdr;
  CellInfo info;

  assert( sqlite3_mutex_held(pBt->mutex) );
  if( pgno>btreePagecount(pBt) ){
    return SQLITE_CORRUPT_BKPT;
  }
  rc = getAndInitPage(pBt, pgno, &pPage, 0, 0);
  if( rc ) return rc;

  if( pPage->bBusy ){
    rc = SQLITE_CORRUPT_BKPT;
    goto cleardatabasepage_out;
  }
  pPage->bBusy = 1;
  hdr = pPage->hdrOffset;
  for(i=0; i<pPage->nCell; i++){
    pCell = findCell(pPage, i);
    if( !pPage->leaf ){
      rc = clearDatabasePage(pBt, get4byte(pCell), 1, pnChange);
      if( rc ) goto cleardatabasepage_out;
    }
    rc = clearCell(pPage, pCell, &info);
    if( rc ) goto cleardatabasepage_out;
  }
  if( !pPage->leaf ){

    rc = clearDatabasePage(pBt, get4byte(&pPage->aData[hdr+8]), 1, pnChange);

    if( rc ) goto cleardatabasepage_out;
  }else if( pnChange ){
    assert( pPage->intKey || CORRUPT_DB );
    testcase( !pPage->intKey );
    *pnChange += pPage->nCell;
  }
  if( freePageFlag ){







|
>














>









|






>
|
>







9014
9015
9016
9017
9018
9019
9020
9021
9022
9023
9024
9025
9026
9027
9028
9029
9030
9031
9032
9033
9034
9035
9036
9037
9038
9039
9040
9041
9042
9043
9044
9045
9046
9047
9048
9049
9050
9051
9052
9053
9054
9055
9056
9057
9058
9059
9060
9061
9062
9063
** Erase the given database page and all its children.  Return
** the page to the freelist.
*/
static int clearDatabasePage(
  BtShared *pBt,           /* The BTree that contains the table */
  Pgno pgno,               /* Page number to clear */
  int freePageFlag,        /* Deallocate page if true */
  int *pnChange,           /* Add number of Cells freed to this counter */
  Pgno pgnoRoot
){
  MemPage *pPage;
  int rc;
  unsigned char *pCell;
  int i;
  int hdr;
  CellInfo info;

  assert( sqlite3_mutex_held(pBt->mutex) );
  if( pgno>btreePagecount(pBt) ){
    return SQLITE_CORRUPT_BKPT;
  }
  rc = getAndInitPage(pBt, pgno, &pPage, 0, 0);
  if( rc ) return rc;
  setMempageRoot(pPage, pgnoRoot);
  if( pPage->bBusy ){
    rc = SQLITE_CORRUPT_BKPT;
    goto cleardatabasepage_out;
  }
  pPage->bBusy = 1;
  hdr = pPage->hdrOffset;
  for(i=0; i<pPage->nCell; i++){
    pCell = findCell(pPage, i);
    if( !pPage->leaf ){
      rc = clearDatabasePage(pBt, get4byte(pCell), 1, pnChange, pgnoRoot);
      if( rc ) goto cleardatabasepage_out;
    }
    rc = clearCell(pPage, pCell, &info);
    if( rc ) goto cleardatabasepage_out;
  }
  if( !pPage->leaf ){
    rc = clearDatabasePage(
        pBt, get4byte(&pPage->aData[hdr+8]), 1, pnChange, pgnoRoot
    );
    if( rc ) goto cleardatabasepage_out;
  }else if( pnChange ){
    assert( pPage->intKey || CORRUPT_DB );
    testcase( !pPage->intKey );
    *pnChange += pPage->nCell;
  }
  if( freePageFlag ){
9070
9071
9072
9073
9074
9075
9076
9077
9078
9079
9080
9081
9082
9083
9084
  rc = saveAllCursors(pBt, (Pgno)iTable, 0);

  if( SQLITE_OK==rc ){
    /* Invalidate all incrblob cursors open on table iTable (assuming iTable
    ** is the root of a table b-tree - if it is not, the following call is
    ** a no-op).  */
    invalidateIncrblobCursors(p, (Pgno)iTable, 0, 1);
    rc = clearDatabasePage(pBt, (Pgno)iTable, 0, pnChange);
  }
  sqlite3BtreeLeave(p);
  return rc;
}

/*
** Delete all information from the single table that pCur is open on.







|







9094
9095
9096
9097
9098
9099
9100
9101
9102
9103
9104
9105
9106
9107
9108
  rc = saveAllCursors(pBt, (Pgno)iTable, 0);

  if( SQLITE_OK==rc ){
    /* Invalidate all incrblob cursors open on table iTable (assuming iTable
    ** is the root of a table b-tree - if it is not, the following call is
    ** a no-op).  */
    invalidateIncrblobCursors(p, (Pgno)iTable, 0, 1);
    rc = clearDatabasePage(pBt, (Pgno)iTable, 0, pnChange, (Pgno)iTable);
  }
  sqlite3BtreeLeave(p);
  return rc;
}

/*
** Delete all information from the single table that pCur is open on.
Changes to src/btreeInt.h.
297
298
299
300
301
302
303



304
305
306
307
308
309
310
  u8 *aData;           /* Pointer to disk image of the page data */
  u8 *aDataEnd;        /* One byte past the end of usable data */
  u8 *aCellIdx;        /* The cell index area */
  u8 *aDataOfst;       /* Same as aData for leaves.  aData+4 for interior */
  DbPage *pDbPage;     /* Pager page handle */
  u16 (*xCellSize)(MemPage*,u8*);             /* cellSizePtr method */
  void (*xParseCell)(MemPage*,u8*,CellInfo*); /* btreeParseCell method */



};

/*
** A linked list of the following structures is stored at BtShared.pLock.
** Locks are added (or upgraded from READ_LOCK to WRITE_LOCK) when a cursor 
** is opened on the table with root page BtShared.iTable. Locks are removed
** from this list when a transaction is committed or rolled back, or when







>
>
>







297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
  u8 *aData;           /* Pointer to disk image of the page data */
  u8 *aDataEnd;        /* One byte past the end of usable data */
  u8 *aCellIdx;        /* The cell index area */
  u8 *aDataOfst;       /* Same as aData for leaves.  aData+4 for interior */
  DbPage *pDbPage;     /* Pager page handle */
  u16 (*xCellSize)(MemPage*,u8*);             /* cellSizePtr method */
  void (*xParseCell)(MemPage*,u8*,CellInfo*); /* btreeParseCell method */
#ifndef SQLITE_OMIT_CONCURRENT
  u32 pgnoRoot;        /* Root page of b-tree that this page belongs to */
#endif
};

/*
** A linked list of the following structures is stored at BtShared.pLock.
** Locks are added (or upgraded from READ_LOCK to WRITE_LOCK) when a cursor 
** is opened on the table with root page BtShared.iTable. Locks are removed
** from this list when a transaction is committed or rolled back, or when
Changes to src/wal.c.
239
240
241
242
243
244
245

246
247
248
249
250
251
252
** When a rollback occurs, the value of K is decreased. Hash table entries
** that correspond to frames greater than the new K value are removed
** from the hash table at this point.
*/
#ifndef SQLITE_OMIT_WAL

#include "wal.h"


/*
** Trace output macros
*/
#if defined(SQLITE_TEST) && defined(SQLITE_DEBUG)
int sqlite3WalTrace = 0;
# define WALTRACE(X)  if(sqlite3WalTrace) sqlite3DebugPrintf X







>







239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
** When a rollback occurs, the value of K is decreased. Hash table entries
** that correspond to frames greater than the new K value are removed
** from the hash table at this point.
*/
#ifndef SQLITE_OMIT_WAL

#include "wal.h"
#include "btreeInt.h"

/*
** Trace output macros
*/
#if defined(SQLITE_TEST) && defined(SQLITE_DEBUG)
int sqlite3WalTrace = 0;
# define WALTRACE(X)  if(sqlite3WalTrace) sqlite3DebugPrintf X
2912
2913
2914
2915
2916
2917
2918










2919
2920


2921


2922
2923

2924
2925
2926
2927
2928
2929
2930
              sz = (sz&0xfe00) + ((sz&0x0001)<<16);
              iOffset = walFrameOffset(i+iZero, sz) + WAL_FRAME_HDRSIZE + 40;
              rc = sqlite3OsRead(pWal->pWalFd, aNew, sizeof(aNew), iOffset);
              if( rc==SQLITE_OK && memcmp(aOld, aNew, sizeof(aNew)) ){
                rc = SQLITE_BUSY_SNAPSHOT;
              }
            }else if( sqlite3BitvecTestNotNull(pAllRead, aPgno[i]) ){










              sqlite3_log(SQLITE_OK,
                  "cannot commit CONCURRENT transaction (conflict at page %d)",


                  (int)aPgno[i]


              );
              rc = SQLITE_BUSY_SNAPSHOT;

            }else if( (pPg = sqlite3PagerLookup(pPager, aPgno[i])) ){
              /* Page aPgno[i], which is present in the pager cache, has been
              ** modified since the current CONCURRENT transaction was started.
              ** However it was not read by the current transaction, so is not
              ** a conflict. There are two possibilities: (a) the page was
              ** allocated at the of the file by the current transaction or 
              ** (b) was present in the cache at the start of the transaction.







>
>
>
>
>
>
>
>
>
>
|
|
>
>
|
>
>
|
|
>







2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
              sz = (sz&0xfe00) + ((sz&0x0001)<<16);
              iOffset = walFrameOffset(i+iZero, sz) + WAL_FRAME_HDRSIZE + 40;
              rc = sqlite3OsRead(pWal->pWalFd, aNew, sizeof(aNew), iOffset);
              if( rc==SQLITE_OK && memcmp(aOld, aNew, sizeof(aNew)) ){
                rc = SQLITE_BUSY_SNAPSHOT;
              }
            }else if( sqlite3BitvecTestNotNull(pAllRead, aPgno[i]) ){
              PgHdr *pPg = 0;
              rc = sqlite3PagerGet(pPage1->pPager, aPgno[i], &pPg, 0);
              if( rc==SQLITE_OK ){
                Pgno pgnoRoot = 0;
                int bWrite = -1;
                if( pPg ){
                  pgnoRoot = ((MemPage*)sqlite3PagerGetExtra(pPg))->pgnoRoot;
                  bWrite = sqlite3PagerIswriteable(pPg);
                  sqlite3PagerUnref(pPg);
                }
                sqlite3_log(SQLITE_OK,
                  "cannot commit CONCURRENT transaction "
                  "- conflict at page %d "
                  "(%s page; part of b-tree with root page %d)",
                  (int)aPgno[i], 
                  (bWrite==0?"read-only":(bWrite>0?"read/write":"unknown")),
                  (int)pgnoRoot
                );
                rc = SQLITE_BUSY_SNAPSHOT;
              }
            }else if( (pPg = sqlite3PagerLookup(pPager, aPgno[i])) ){
              /* Page aPgno[i], which is present in the pager cache, has been
              ** modified since the current CONCURRENT transaction was started.
              ** However it was not read by the current transaction, so is not
              ** a conflict. There are two possibilities: (a) the page was
              ** allocated at the of the file by the current transaction or 
              ** (b) was present in the cache at the start of the transaction.
Added test/concurrent5.test.


























































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# 2017 May 26
#
# The author disclaims copyright to this source code.  In place of
# a legal notice, here is a blessing:
#
#    May you do good and not evil.
#    May you find forgiveness for yourself and forgive others.
#    May you share freely, never taking more than you give.
#
#***********************************************************************
#
#

set testdir [file dirname $argv0]
source $testdir/tester.tcl
source $testdir/lock_common.tcl
source $testdir/wal_common.tcl
set ::testprefix concurrent5

ifcapable !concurrent {
  finish_test
  return
}

db close
sqlite3_shutdown
test_sqlite3_log [list lappend ::log]
set ::log [list]

sqlite3 db test.db

proc do_test_conflict_msg {tn msg} {
  set msg "cannot commit CONCURRENT transaction - [string trim $msg]"
  uplevel [list do_test $tn {lindex $::log end} $msg]
}

do_execsql_test 1.0 {
  PRAGMA journal_mode = wal;
  CREATE TABLE t1(x, y);
  CREATE TABLE t2(c);
  WITH s(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM s WHERE i<100)
  INSERT INTO t1 SELECT randomblob(400), randomblob(400) FROM s;
} {wal}

sqlite3 db2 test.db

do_test 1.1.1 {
  set ::log [list]
  db2 eval {
    BEGIN CONCURRENT;
      SELECT count(*) FROM t1;
      INSERT INTO t2 VALUES(10);
  }

  db eval {
    INSERT INTO t1 VALUES(randomblob(200), randomblob(200));
  }

  catchsql COMMIT db2
} {1 {database is locked}}
do_test_conflict_msg 1.1.2 {
  conflict at page 2 (read-only page; part of b-tree with root page 2)
}

do_test 1.2.1 {
  set ::log [list]
  db2 eval {
    ROLLBACK;
    BEGIN CONCURRENT;
      INSERT INTO t1 VALUES(11, 12);
  }

  db eval {
    INSERT INTO t1 VALUES(12, 11);
  }

  catchsql COMMIT db2
} {1 {database is locked}}

do_test_conflict_msg 1.2.2 {
  conflict at page 105 (read/write page; part of b-tree with root page 2)
}

do_test 1.3.1 {
  set ::log [list]
  db2 eval {
    ROLLBACK;
    BEGIN CONCURRENT;
      INSERT INTO t2 VALUES('x');
  }

  db eval {
    INSERT INTO t2 VALUES('y');
  }

  catchsql COMMIT db2
} {1 {database is locked}}

do_test_conflict_msg 1.3.2 {
  conflict at page 3 (read/write page; part of b-tree with root page 3)
}

db close
db2 close
sqlite3_shutdown
test_sqlite3_log 
sqlite3_initialize
finish_test