/ Check-in [4460764e]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix another problem involving unlocked transactions and wal-file restarts.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | begin-concurrent
Files: files | file ages | folders
SHA1: 4460764ea8fc948fe02f0a09476857839b3aa1ae
User & Date: dan 2015-08-24 06:43:25
Wiki:begin-concurrent
Context
2015-08-24
06:44
Merge trunk changes with this branch. check-in: 876810c2 user: dan tags: begin-concurrent
06:43
Fix another problem involving unlocked transactions and wal-file restarts. check-in: 4460764e user: dan tags: begin-concurrent
2015-08-22
20:32
Fix a problem to do with detecting unlocked transaction conflicts if another client restarts the wal while the transaction is running. check-in: e3968b25 user: dan tags: begin-concurrent
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/pager.c.

6125
6126
6127
6128
6129
6130
6131
6132

6133

6134
6135
6136
6137
6138
6139
6140
}

int sqlite3PagerUpgradeSnapshot(Pager *pPager, DbPage *pPage1){
  int rc;
  u32 iFrame = 0;

  assert( pPager->pWal && pPager->pAllRead );
  sqlite3WalUpgradeSnapshot(pPager->pWal);

  rc = sqlite3WalFindFrame(pPager->pWal, 1, &iFrame);

  if( rc==SQLITE_OK ){
    rc = readDbPage(pPage1, iFrame);
  }

  return rc;
}








|
>
|
>







6125
6126
6127
6128
6129
6130
6131
6132
6133
6134
6135
6136
6137
6138
6139
6140
6141
6142
}

int sqlite3PagerUpgradeSnapshot(Pager *pPager, DbPage *pPage1){
  int rc;
  u32 iFrame = 0;

  assert( pPager->pWal && pPager->pAllRead );
  rc = sqlite3WalUpgradeSnapshot(pPager->pWal);
  if( rc==SQLITE_OK ){
    rc = sqlite3WalFindFrame(pPager->pWal, 1, &iFrame);
  }
  if( rc==SQLITE_OK ){
    rc = readDbPage(pPage1, iFrame);
  }

  return rc;
}

Changes to src/wal.c.

2663
2664
2665
2666
2667
2668
2669

























2670

2671
2672









2673
2674
2675
2676
2677
2678
2679
....
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
....
2810
2811
2812
2813
2814
2815
2816
2817

2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
      }
    }
  }

  return rc;
}


























void sqlite3WalUpgradeSnapshot(Wal *pWal){

  assert( pWal->writeLock );
  memcpy(&pWal->hdr, (void*)walIndexHdr(pWal), sizeof(WalIndexHdr));









}

/*
** End a write transaction.  The commit has already been done.  This
** routine merely releases the lock.
*/
int sqlite3WalEndWriteTransaction(Wal *pWal){
................................................................................
**
** SQLITE_OK is returned if no error is encountered (regardless of whether
** or not pWal->hdr.mxFrame is modified). An SQLite error code is returned
** if an error occurs.
*/
static int walRestartLog(Wal *pWal){
  int rc = SQLITE_OK;
  int cnt;

  if( pWal->readLock==0 ){
    volatile WalCkptInfo *pInfo = walCkptInfo(pWal);
    assert( pInfo->nBackfill==pWal->hdr.mxFrame );
    if( pInfo->nBackfill>0 ){
      u32 salt1;
      sqlite3_randomness(4, &salt1);
................................................................................
        ** to handle if this transaction is rolled back.  */
        walRestartHdr(pWal, salt1);
        walUnlockExclusive(pWal, WAL_READ_LOCK(1), WAL_NREADER-1);
      }else if( rc!=SQLITE_BUSY ){
        return rc;
      }
    }
    walUnlockShared(pWal, WAL_READ_LOCK(0));

    pWal->readLock = -1;
    cnt = 0;
    do{
      int notUsed;
      rc = walTryBeginRead(pWal, &notUsed, 1, ++cnt);
    }while( rc==WAL_RETRY );
    assert( (rc&0xff)!=SQLITE_BUSY ); /* BUSY not possible when useWal==1 */
    testcase( (rc&0xff)==SQLITE_IOERR );
    testcase( rc==SQLITE_PROTOCOL );
    testcase( rc==SQLITE_OK );
  }
  return rc;
}

/*
** Information about the current state of the WAL file and where
** the next fsync should occur - passed from sqlite3WalFrames() into







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>


>
>
>
>
>
>
>
>
>







 







<







 







<
>
|
|
|
|
|
|
|
|
|
<







2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
....
2820
2821
2822
2823
2824
2825
2826

2827
2828
2829
2830
2831
2832
2833
....
2844
2845
2846
2847
2848
2849
2850

2851
2852
2853
2854
2855
2856
2857
2858
2859
2860

2861
2862
2863
2864
2865
2866
2867
      }
    }
  }

  return rc;
}

/*
** This function is called by a writer that has a read-lock on aReadmark[0]
** (pWal->readLock==0). This function relinquishes that lock and takes a
** lock on a different aReadmark[] slot. 
**
** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
*/
static int walUpgradeReadlock(Wal *pWal){
  int cnt;
  int rc;
  assert( pWal->writeLock && pWal->readLock==0 );
  walUnlockShared(pWal, WAL_READ_LOCK(0));
  pWal->readLock = -1;
  cnt = 0;
  do{
    int notUsed;
    rc = walTryBeginRead(pWal, &notUsed, 1, ++cnt);
  }while( rc==WAL_RETRY );
  assert( (rc&0xff)!=SQLITE_BUSY ); /* BUSY not possible when useWal==1 */
  testcase( (rc&0xff)==SQLITE_IOERR );
  testcase( rc==SQLITE_PROTOCOL );
  testcase( rc==SQLITE_OK );
  return rc;
}

int sqlite3WalUpgradeSnapshot(Wal *pWal){
  int rc = SQLITE_OK;
  assert( pWal->writeLock );
  memcpy(&pWal->hdr, (void*)walIndexHdr(pWal), sizeof(WalIndexHdr));

  /* If this client has its read-lock on slot aReadmark[0] and the entire
  ** wal has not been checkpointed, switch it to a different slot. Otherwise
  ** any reads performed between now and committing the transaction will
  ** read from the old snapshot - not the one just upgraded to.  */
  if( pWal->readLock==0 && pWal->hdr.mxFrame!=walCkptInfo(pWal)->nBackfill ){
    rc = walUpgradeReadlock(pWal);
  }
  return rc;
}

/*
** End a write transaction.  The commit has already been done.  This
** routine merely releases the lock.
*/
int sqlite3WalEndWriteTransaction(Wal *pWal){
................................................................................
**
** SQLITE_OK is returned if no error is encountered (regardless of whether
** or not pWal->hdr.mxFrame is modified). An SQLite error code is returned
** if an error occurs.
*/
static int walRestartLog(Wal *pWal){
  int rc = SQLITE_OK;


  if( pWal->readLock==0 ){
    volatile WalCkptInfo *pInfo = walCkptInfo(pWal);
    assert( pInfo->nBackfill==pWal->hdr.mxFrame );
    if( pInfo->nBackfill>0 ){
      u32 salt1;
      sqlite3_randomness(4, &salt1);
................................................................................
        ** to handle if this transaction is rolled back.  */
        walRestartHdr(pWal, salt1);
        walUnlockExclusive(pWal, WAL_READ_LOCK(1), WAL_NREADER-1);
      }else if( rc!=SQLITE_BUSY ){
        return rc;
      }
    }


    /* Regardless of whether or not the wal file was restarted, change the
    ** read-lock held by this client to a slot other than aReadmark[0]. 
    ** Clients with a lock on aReadmark[0] read from the database file 
    ** only - never from the wal file. This means that if a writer holding
    ** a lock on aReadmark[0] were to commit a transaction but not close the
    ** read-transaction, subsequent read operations would read directly from
    ** the database file - ignoring the new pages just appended
    ** to the wal file. */
    rc = walUpgradeReadlock(pWal);

  }
  return rc;
}

/*
** Information about the current state of the WAL file and where
** the next fsync should occur - passed from sqlite3WalFrames() into

Changes to src/wal.h.

123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/* Return true if the argument is non-NULL and the WAL module is using
** heap-memory for the wal-index. Otherwise, if the argument is NULL or the
** WAL module is using shared-memory, return false. 
*/
int sqlite3WalHeapMemory(Wal *pWal);

int sqlite3WalLockForCommit(Wal *pWal, PgHdr *pPg, Bitvec *pRead);
void sqlite3WalUpgradeSnapshot(Wal *pWal);

#ifdef SQLITE_ENABLE_ZIPVFS
/* If the WAL file is not empty, return the number of bytes of content
** stored in each frame (i.e. the db page-size when the WAL was created).
*/
int sqlite3WalFramesize(Wal *pWal);
#endif

#endif /* ifndef SQLITE_OMIT_WAL */
#endif /* _WAL_H_ */







|










123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/* Return true if the argument is non-NULL and the WAL module is using
** heap-memory for the wal-index. Otherwise, if the argument is NULL or the
** WAL module is using shared-memory, return false. 
*/
int sqlite3WalHeapMemory(Wal *pWal);

int sqlite3WalLockForCommit(Wal *pWal, PgHdr *pPg, Bitvec *pRead);
int sqlite3WalUpgradeSnapshot(Wal *pWal);

#ifdef SQLITE_ENABLE_ZIPVFS
/* If the WAL file is not empty, return the number of bytes of content
** stored in each frame (i.e. the db page-size when the WAL was created).
*/
int sqlite3WalFramesize(Wal *pWal);
#endif

#endif /* ifndef SQLITE_OMIT_WAL */
#endif /* _WAL_H_ */

Changes to test/unlocked3.test.

90
91
92
93
94
95
96
97

98
99
100
101
102
103
104
105
106
...
115
116
117
118
119
120
121
122
123
124
125
126
127
128
  . -----------------------
  4 {1i}
  5 {1d 2i}
  . -----------------------
  6 {1iii 2iii 3iii 4iii}
  7 {1di  2id  3iii 4ddd}
  8 {1iii 2iii 3iii 4iii}


} {
  # 9 {1D  2II}
  if {[string range $oplist 0 0]=="-"} {
    reset_db
    create_schema
    continue
  }
  foreach db $DBLIST { sqlite3 $db test.db }

................................................................................
    }

    foreach db $DBLIST { $db eval "COMMIT" }
    db eval {PRAGMA integrity_check}
  } {ok}

  foreach db $DBLIST { 
    if {$db=="db2"} breakpoint
    $db close 
  }
}

finish_test








<
>

<







 







<






90
91
92
93
94
95
96

97
98

99
100
101
102
103
104
105
...
114
115
116
117
118
119
120

121
122
123
124
125
126
  . -----------------------
  4 {1i}
  5 {1d 2i}
  . -----------------------
  6 {1iii 2iii 3iii 4iii}
  7 {1di  2id  3iii 4ddd}
  8 {1iii 2iii 3iii 4iii}

  9 {1D  2II}
} {

  if {[string range $oplist 0 0]=="-"} {
    reset_db
    create_schema
    continue
  }
  foreach db $DBLIST { sqlite3 $db test.db }

................................................................................
    }

    foreach db $DBLIST { $db eval "COMMIT" }
    db eval {PRAGMA integrity_check}
  } {ok}

  foreach db $DBLIST { 

    $db close 
  }
}

finish_test