/ Check-in [b51a5f8b]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a problem that occurs when one process causes the log-summary file to grow and then a second process attempts to read the database.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | wal
Files: files | file ages | folders
SHA1: b51a5f8bc660616aa264025dd7ad4bdab458814b
User & Date: dan 2010-04-27 05:42:32
Context
2010-04-27
06:49
Run trans.test and avtrans.test as part of the "wal" permutation. check-in: c8893310 user: dan tags: wal
05:42
Fix a problem that occurs when one process causes the log-summary file to grow and then a second process attempts to read the database. check-in: b51a5f8b user: dan tags: wal
01:56
Merge in recent changes from the trunk check-in: 7a0ac682 user: drh tags: wal
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/wal.c.

587
588
589
590
591
592
593
















594
595
596
597
598
599
600
...
609
610
611
612
613
614
615
616
617
618
619

620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
...
645
646
647
648
649
650
651


652
653
654
655
656
657
658
....
1492
1493
1494
1495
1496
1497
1498










1499
1500
1501
1502
1503
1504
1505
....
1512
1513
1514
1515
1516
1517
1518

1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532

1533
1534
1535
1536
1537
1538
1539
    return SQLITE_IOERR;
  }
  pSummary->aData = (u32 *)pMap;
  pSummary->nData = nByte/4;

  return SQLITE_OK;
}

















/*
** Return the index in the LogSummary.aData array that corresponds to 
** frame iFrame. The log-summary file consists of a header, followed by
** alternating "map" and "index" blocks.
*/
static int logSummaryEntry(u32 iFrame){
................................................................................
/*
** Set an entry in the log-summary map to map log frame iFrame to db 
** page iPage. Values are always appended to the log-summary (i.e. the
** value of iFrame is always exactly one more than the value passed to
** the previous call), but that restriction is not enforced or asserted
** here.
*/
static void logSummaryAppend(LogSummary *pSummary, u32 iFrame, u32 iPage){
  u32 iSlot = logSummaryEntry(iFrame);

  while( (iSlot+128)>=pSummary->nData ){

    int nByte = pSummary->nData*4 + LOGSUMMARY_MMAP_INCREMENT;

    /* Unmap and remap the log-summary file. */
    sqlite3_mutex_enter(pSummary->mutex);
    munmap(pSummary->aData, pSummary->nData*4);
    pSummary->aData = 0;
    logSummaryMap(pSummary, nByte);
    sqlite3_mutex_leave(pSummary->mutex);
  }

  /* Set the log-summary entry itself */
  pSummary->aData[iSlot] = iPage;

  /* If the frame number is a multiple of 256 (frames are numbered starting
  ** at 1), build an index of the most recently added 256 frames.
................................................................................
    aTmp = &aIndex[256];

    nIndex = 256;
    for(i=0; i<256; i++) aIndex[i] = (u8)i;
    logMergesort8(aFrame, aTmp, aIndex, &nIndex);
    memset(&aIndex[nIndex], aIndex[nIndex-1], 256-nIndex);
  }


}


/*
** Recover the log-summary by reading the log file. The caller must hold 
** an exclusive lock on the log-summary file.
*/
................................................................................
      return rc;
    }

    rc = logSummaryReadHdr(pLog, pChanged);
    if( rc!=SQLITE_OK ){
      /* An error occured while attempting log recovery. */
      sqlite3WalCloseSnapshot(pLog);










    }
  }
  return rc;
}

/*
** Unlock the current snapshot.
................................................................................
  pLog->isLocked = 0;
}

/* 
** Read a page from the log, if it is present. 
*/
int sqlite3WalRead(Log *pLog, Pgno pgno, int *pInLog, u8 *pOut){

  u32 iRead = 0;
  u32 *aData; 
  int iFrame = (pLog->hdr.iLastPg & 0xFFFFFF00);

  assert( pLog->isLocked );

  sqlite3_mutex_enter(pLog->pSummary->mutex);
  aData = pLog->pSummary->aData;

  /* Do a linear search of the unindexed block of page-numbers (if any) 
  ** at the end of the log-summary. An alternative to this would be to
  ** build an index in private memory each time a read transaction is
  ** opened on a new snapshot.
  */

  if( pLog->hdr.iLastPg ){
    u32 *pi = &aData[logSummaryEntry(pLog->hdr.iLastPg)];
    u32 *piStop = pi - (pLog->hdr.iLastPg & 0xFF);
    while( *pi!=pgno && pi!=piStop ) pi--;
    if( pi!=piStop ){
      iRead = (pi-piStop) + iFrame;
    }







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|



>



|
|
|
|
<







 







>
>







 







>
>
>
>
>
>
>
>
>
>







 







>





<
|
<






>







587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
...
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643

644
645
646
647
648
649
650
...
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
....
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
....
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552

1553

1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
    return SQLITE_IOERR;
  }
  pSummary->aData = (u32 *)pMap;
  pSummary->nData = nByte/4;

  return SQLITE_OK;
}

/*
** The log-summary file is already mapped to pSummary->aData[], but the
** mapping needs to be resized. Unmap and remap the file so that the mapping 
** is at least nByte bytes in size, or the size of the entire file if it 
** is larger than nByte bytes.
*/
static int logSummaryRemap(LogSummary *pSummary, int nByte){
  int rc;
  sqlite3_mutex_enter(pSummary->mutex);
  munmap(pSummary->aData, pSummary->nData*4);
  pSummary->aData = 0;
  rc = logSummaryMap(pSummary, nByte);
  sqlite3_mutex_leave(pSummary->mutex);
  return rc;
}

/*
** Return the index in the LogSummary.aData array that corresponds to 
** frame iFrame. The log-summary file consists of a header, followed by
** alternating "map" and "index" blocks.
*/
static int logSummaryEntry(u32 iFrame){
................................................................................
/*
** Set an entry in the log-summary map to map log frame iFrame to db 
** page iPage. Values are always appended to the log-summary (i.e. the
** value of iFrame is always exactly one more than the value passed to
** the previous call), but that restriction is not enforced or asserted
** here.
*/
static int logSummaryAppend(LogSummary *pSummary, u32 iFrame, u32 iPage){
  u32 iSlot = logSummaryEntry(iFrame);

  while( (iSlot+128)>=pSummary->nData ){
    int rc;
    int nByte = pSummary->nData*4 + LOGSUMMARY_MMAP_INCREMENT;

    /* Unmap and remap the log-summary file. */
    rc = logSummaryRemap(pSummary, nByte);
    if( rc!=SQLITE_OK ){
      return rc;
    }

  }

  /* Set the log-summary entry itself */
  pSummary->aData[iSlot] = iPage;

  /* If the frame number is a multiple of 256 (frames are numbered starting
  ** at 1), build an index of the most recently added 256 frames.
................................................................................
    aTmp = &aIndex[256];

    nIndex = 256;
    for(i=0; i<256; i++) aIndex[i] = (u8)i;
    logMergesort8(aFrame, aTmp, aIndex, &nIndex);
    memset(&aIndex[nIndex], aIndex[nIndex-1], 256-nIndex);
  }

  return SQLITE_OK;
}


/*
** Recover the log-summary by reading the log file. The caller must hold 
** an exclusive lock on the log-summary file.
*/
................................................................................
      return rc;
    }

    rc = logSummaryReadHdr(pLog, pChanged);
    if( rc!=SQLITE_OK ){
      /* An error occured while attempting log recovery. */
      sqlite3WalCloseSnapshot(pLog);
    }else{
      /* Check if the mapping needs to grow. */
      LogSummary *pSummary = pLog->pSummary;

      if( pLog->hdr.iLastPg 
       && logSummaryEntry(pLog->hdr.iLastPg)>=pSummary->nData 
      ){
        rc = logSummaryRemap(pSummary, 0);
        assert( rc || logSummaryEntry(pLog->hdr.iLastPg)<pSummary->nData );
      }
    }
  }
  return rc;
}

/*
** Unlock the current snapshot.
................................................................................
  pLog->isLocked = 0;
}

/* 
** Read a page from the log, if it is present. 
*/
int sqlite3WalRead(Log *pLog, Pgno pgno, int *pInLog, u8 *pOut){
  LogSummary *pSummary = pLog->pSummary;
  u32 iRead = 0;
  u32 *aData; 
  int iFrame = (pLog->hdr.iLastPg & 0xFFFFFF00);

  assert( pLog->isLocked );

  sqlite3_mutex_enter(pSummary->mutex);


  /* Do a linear search of the unindexed block of page-numbers (if any) 
  ** at the end of the log-summary. An alternative to this would be to
  ** build an index in private memory each time a read transaction is
  ** opened on a new snapshot.
  */
  aData = pSummary->aData;
  if( pLog->hdr.iLastPg ){
    u32 *pi = &aData[logSummaryEntry(pLog->hdr.iLastPg)];
    u32 *piStop = pi - (pLog->hdr.iLastPg & 0xFF);
    while( *pi!=pgno && pi!=piStop ) pi--;
    if( pi!=piStop ){
      iRead = (pi-piStop) + iFrame;
    }

Changes to test/lock2.test.

30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# lock2-1.6: Release the SHARED lock held by the second process. 
# lock2-1.7: Attempt to reaquire a SHARED lock with the second process.
#            this fails due to the PENDING lock.
# lock2-1.8: Ensure the first process can now upgrade to EXCLUSIVE.
#
do_test lock2-1.1 {
  set ::tf1 [launch_testfixture]
  testfixture $::tf1 "sqlite3_test_control_pending_byte $::sqlite_pending_byte"
  testfixture $::tf1 {
    sqlite3 db test.db -key xyzzy
    db eval {select * from sqlite_master}
  }
} {}
do_test lock2-1.1.1 {
  execsql {pragma lock_status}







<







30
31
32
33
34
35
36

37
38
39
40
41
42
43
# lock2-1.6: Release the SHARED lock held by the second process. 
# lock2-1.7: Attempt to reaquire a SHARED lock with the second process.
#            this fails due to the PENDING lock.
# lock2-1.8: Ensure the first process can now upgrade to EXCLUSIVE.
#
do_test lock2-1.1 {
  set ::tf1 [launch_testfixture]

  testfixture $::tf1 {
    sqlite3 db test.db -key xyzzy
    db eval {select * from sqlite_master}
  }
} {}
do_test lock2-1.1.1 {
  execsql {pragma lock_status}

Changes to test/lock_common.tcl.

20
21
22
23
24
25
26

27
28
29
30
31
32
33
proc launch_testfixture {} {
  set prg [info nameofexec]
  if {$prg eq ""} {
    set prg [file join . testfixture]
  }
  set chan [open "|$prg tf_main.tcl" r+]
  fconfigure $chan -buffering line

  return $chan
}

# Execute a command in a child testfixture process, connected by two-way
# channel $chan. Return the result of the command, or an error message.
proc testfixture {chan cmd} {
  puts $chan $cmd







>







20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
proc launch_testfixture {} {
  set prg [info nameofexec]
  if {$prg eq ""} {
    set prg [file join . testfixture]
  }
  set chan [open "|$prg tf_main.tcl" r+]
  fconfigure $chan -buffering line
  testfixture $chan "sqlite3_test_control_pending_byte $::sqlite_pending_byte"
  return $chan
}

# Execute a command in a child testfixture process, connected by two-way
# channel $chan. Return the result of the command, or an error message.
proc testfixture {chan cmd} {
  puts $chan $cmd

Changes to test/permutations.test.

754
755
756
757
758
759
760




761
762
763
764
765
766
767
    insert.test   insert2.test  insert3.test rollback.test 
    select1.test  select2.test  select3.test
  }
}

run_tests "wal" -description {
  Run tests with journal_mode=WAL




} -include {
  savepoint.test
  savepoint2.test
  savepoint6.test
}

# End of tests







>
>
>
>







754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
    insert.test   insert2.test  insert3.test rollback.test 
    select1.test  select2.test  select3.test
  }
}

run_tests "wal" -description {
  Run tests with journal_mode=WAL
} -initialize {
  set ::savepoint6_iterations 100
} -shutdown {
  unset -nocomplain ::savepoint6_iterations
} -include {
  savepoint.test
  savepoint2.test
  savepoint6.test
}

# End of tests

Changes to test/savepoint6.test.

22
23
24
25
26
27
28
29


30
31
32
33
34
35
36
...
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
set DATABASE_SCHEMA {
    PRAGMA auto_vacuum = incremental;
    CREATE TABLE t1(x, y);
    CREATE UNIQUE INDEX i1 ON t1(x);
    CREATE INDEX i2 ON t1(y);
}

set ::ITERATIONS 1000



#--------------------------------------------------------------------------
# In memory database state.
#
# ::lSavepoint is a list containing one entry for each active savepoint. The
# first entry in the list corresponds to the most recently opened savepoint.
# Each entry consists of two elements:
................................................................................
  do_test savepoint6-$testname.setup {
    savepoint one
    insert_rows [random_integers 100 1000]
    release one
    checkdb
  } {ok}
  
  for {set i 0} {$i < $::ITERATIONS} {incr i} {
    do_test savepoint6-$testname.$i.1 {
      savepoint_op
      checkdb
    } {ok}
  
    do_test savepoint6-$testname.$i.2 {
      database_op







|
>
>







 







|







22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
...
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
set DATABASE_SCHEMA {
    PRAGMA auto_vacuum = incremental;
    CREATE TABLE t1(x, y);
    CREATE UNIQUE INDEX i1 ON t1(x);
    CREATE INDEX i2 ON t1(y);
}

if {0==[info exists ::savepoint6_iterations]} {
  set ::savepoint6_iterations 1000
}

#--------------------------------------------------------------------------
# In memory database state.
#
# ::lSavepoint is a list containing one entry for each active savepoint. The
# first entry in the list corresponds to the most recently opened savepoint.
# Each entry consists of two elements:
................................................................................
  do_test savepoint6-$testname.setup {
    savepoint one
    insert_rows [random_integers 100 1000]
    release one
    checkdb
  } {ok}
  
  for {set i 0} {$i < $::savepoint6_iterations} {incr i} {
    do_test savepoint6-$testname.$i.1 {
      savepoint_op
      checkdb
    } {ok}
  
    do_test savepoint6-$testname.$i.2 {
      database_op

Changes to test/wal.test.

15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
...
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
...
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
...
819
820
821
822
823
824
825

826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853





















854
855

















856
857
858

set testdir [file dirname $argv0]
source $testdir/tester.tcl
source $testdir/lock_common.tcl

proc reopen_db {} {
  catch { db close }
  file delete -force test.db test.db-wal
  sqlite3_wal db test.db
}

set ::blobcnt 0
proc blob {nByte} {
  incr ::blobcnt
  return [string range [string repeat "${::blobcnt}x" $nByte] 1 $nByte]
................................................................................
      INSERT INTO t1 SELECT blob(400), blob(400) FROM t1; /*  8 */
      INSERT INTO t1 SELECT blob(400), blob(400) FROM t1; /* 16 */
      INSERT INTO t1 SELECT blob(400), blob(400) FROM t1; /* 32 */
      SELECT count(*) FROM t2;
  }
} {32}
do_test wal-4.5.3 {
breakpoint
  execsql { ROLLBACK TO tr }
} {}
do_test wal-4.5.4 {
  set logsize [file size test.db-wal]
breakpoint
  execsql {
      INSERT INTO t1 VALUES('x', 'y');
    RELEASE tr;
    COMMIT;
  }
  expr { $logsize == [file size test.db-wal] }
} {1}
................................................................................
do_test wal-4.5.5 {
  execsql { SELECT count(*) FROM t2 ; SELECT count(*) FROM t1 }
} {1 2}
do_test wal-4.5.6 {
  file copy -force test.db test2.db
  file copy -force test.db-wal test2.db-wal
  sqlite3 db2 test2.db
breakpoint
  execsql { SELECT count(*) FROM t2 ; SELECT count(*) FROM t1 } db2
} {1 2}
do_test wal-4.5.7 {
  execsql { PRAGMA integrity_check } db2
} {ok}
db2 close

................................................................................
} {B 2}
db2 close
db close

#-------------------------------------------------------------------------
# Test large log summaries.
#

do_test wal-13.1 {
  list [file exists test.db] [file exists test.db-wal]
} {1 0}
do_test wal-13.2 {
  set fd [open test.db-wal w]
  seek $fd [expr 200*1024*1024]
  puts $fd ""
  close $fd
  sqlite3 db test.db
  execsql { SELECT * FROM t2 }
} {B 2}
do_test wal-13.3 {
  db close
  file exists test.db-wal
} {0}
do_test wal-13.4 {
  sqlite3 db test.db
  execsql { SELECT count(*) FROM t2 }
} {1}
do_test wal-13.5 {
  for {set i 0} {$i < 15} {incr i} {
    execsql { INSERT INTO t2 SELECT randomblob(400), randomblob(400) FROM t2 }
  }
  execsql { SELECT count(*) FROM t2 }
} [expr int(pow(2, 15))]
do_test wal-13.6 {
  set sz [file size test.db-wal-summary]
  expr {$sz<=(3*64*1024) && $sz>(2*64*1024)}





















} {1}



















finish_test








|







 







<




<







 







<







 







>
|


|







|



|



|
|



|
|
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
...
245
246
247
248
249
250
251

252
253
254
255

256
257
258
259
260
261
262
...
263
264
265
266
267
268
269

270
271
272
273
274
275
276
...
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894

set testdir [file dirname $argv0]
source $testdir/tester.tcl
source $testdir/lock_common.tcl

proc reopen_db {} {
  catch { db close }
  file delete -force test.db test.db-wal test.db-wal-summary
  sqlite3_wal db test.db
}

set ::blobcnt 0
proc blob {nByte} {
  incr ::blobcnt
  return [string range [string repeat "${::blobcnt}x" $nByte] 1 $nByte]
................................................................................
      INSERT INTO t1 SELECT blob(400), blob(400) FROM t1; /*  8 */
      INSERT INTO t1 SELECT blob(400), blob(400) FROM t1; /* 16 */
      INSERT INTO t1 SELECT blob(400), blob(400) FROM t1; /* 32 */
      SELECT count(*) FROM t2;
  }
} {32}
do_test wal-4.5.3 {

  execsql { ROLLBACK TO tr }
} {}
do_test wal-4.5.4 {
  set logsize [file size test.db-wal]

  execsql {
      INSERT INTO t1 VALUES('x', 'y');
    RELEASE tr;
    COMMIT;
  }
  expr { $logsize == [file size test.db-wal] }
} {1}
................................................................................
do_test wal-4.5.5 {
  execsql { SELECT count(*) FROM t2 ; SELECT count(*) FROM t1 }
} {1 2}
do_test wal-4.5.6 {
  file copy -force test.db test2.db
  file copy -force test.db-wal test2.db-wal
  sqlite3 db2 test2.db

  execsql { SELECT count(*) FROM t2 ; SELECT count(*) FROM t1 } db2
} {1 2}
do_test wal-4.5.7 {
  execsql { PRAGMA integrity_check } db2
} {ok}
db2 close

................................................................................
} {B 2}
db2 close
db close

#-------------------------------------------------------------------------
# Test large log summaries.
#
set sqlite_walsummary_mmap_incr 64
do_test wal-13.1.1 {
  list [file exists test.db] [file exists test.db-wal]
} {1 0}
do_test wal-13.1.2 {
  set fd [open test.db-wal w]
  seek $fd [expr 200*1024*1024]
  puts $fd ""
  close $fd
  sqlite3 db test.db
  execsql { SELECT * FROM t2 }
} {B 2}
do_test wal-13.1.3 {
  db close
  file exists test.db-wal
} {0}
do_test wal-13.1.4 {
  sqlite3 db test.db
  execsql { SELECT count(*) FROM t2 }
} {1}
do_test wal-13.1.5 {
  for {set i 0} {$i < 6} {incr i} {
    execsql { INSERT INTO t2 SELECT randomblob(400), randomblob(400) FROM t2 }
  }
  execsql { SELECT count(*) FROM t2 }
} [expr int(pow(2, 6))]
do_test wal-13.1.6 {
  file size test.db-wal
} [log_file_size 80 1024]

foreach code [list {
  set tn 2
  proc buddy {tcl} { uplevel #0 $tcl }
} {
  set tn 3
  set ::buddy [launch_testfixture]
  proc buddy {tcl} { testfixture $::buddy $tcl }
}] {

  eval $code
  reopen_db

  do_test wal-13.$tn.0 {
    buddy { sqlite3 db2 test.db }
    execsql {
      PRAGMA journal_mode = WAL;
      CREATE TABLE t1(x);
      INSERT INTO t1 SELECT randomblob(400);
    }
    execsql { SELECT count(*) FROM t1 }
  } {1}

  for {set ii 1} {$ii<16} {incr ii} {
    do_test wal-13.$tn.$ii.a {
      buddy { db2 eval { INSERT INTO t1 SELECT randomblob(400) FROM t1 } }
      buddy { db2 eval { SELECT count(*) FROM t1 } }
    } [expr (1<<$ii)]
    do_test wal-13.$tn.$ii.b {
      db eval { SELECT count(*) FROM t1 }
    } [expr (1<<$ii)]
    do_test wal-13.$tn.$ii.c {
      db eval { SELECT count(*) FROM t1 }
    } [expr (1<<$ii)]
  }

  catch { db2 close }
  catch { close $::buddy }
  db close
}

finish_test