SQLite

Check-in [25f85f6872]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Change walthread.test so that tests can be run with either multiple threads or multiple processes.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | wal
Files: files | file ages | folders
SHA1: 25f85f68723e56c18e44b094d85f67b99912dc86
User & Date: dan 2010-04-28 17:48:44.000
Context
2010-04-28
17:49
Merge two "wal" leaves. (check-in: 13d2d5a66e user: dan tags: wal)
17:48
Change walthread.test so that tests can be run with either multiple threads or multiple processes. (check-in: 25f85f6872 user: dan tags: wal)
2010-04-27
18:49
Merge two "wal" leaves. (check-in: 8c2d43babd user: dan tags: wal)
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/test_thread.c.
54
55
56
57
58
59
60

61
62
63
64
65
66
67
static Tcl_ObjCmdProc sqlthread_proc;
static Tcl_ObjCmdProc clock_seconds_proc;
#if defined(SQLITE_OS_UNIX) && defined(SQLITE_ENABLE_UNLOCK_NOTIFY)
static Tcl_ObjCmdProc blocking_step_proc;
static Tcl_ObjCmdProc blocking_prepare_v2_proc;
#endif
int Sqlitetest1_Init(Tcl_Interp *);


/* Functions from test1.c */
void *sqlite3TestTextToPtr(const char *);
const char *sqlite3TestErrorName(int);
int getDbPointer(Tcl_Interp *, const char *, sqlite3 **);
int sqlite3TestMakePointerStr(Tcl_Interp *, char *, void *);
int sqlite3TestErrCode(Tcl_Interp *, sqlite3 *, int);







>







54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
static Tcl_ObjCmdProc sqlthread_proc;
static Tcl_ObjCmdProc clock_seconds_proc;
#if defined(SQLITE_OS_UNIX) && defined(SQLITE_ENABLE_UNLOCK_NOTIFY)
static Tcl_ObjCmdProc blocking_step_proc;
static Tcl_ObjCmdProc blocking_prepare_v2_proc;
#endif
int Sqlitetest1_Init(Tcl_Interp *);
int Sqlite3_Init(Tcl_Interp *);

/* Functions from test1.c */
void *sqlite3TestTextToPtr(const char *);
const char *sqlite3TestErrorName(int);
int getDbPointer(Tcl_Interp *, const char *, sqlite3 **);
int sqlite3TestMakePointerStr(Tcl_Interp *, char *, void *);
int sqlite3TestErrCode(Tcl_Interp *, sqlite3 *, int);
120
121
122
123
124
125
126

127
128
129
130
131
132
133
  Tcl_CreateObjCommand(interp, 
      "sqlite3_blocking_prepare_v2", blocking_prepare_v2_proc, (void *)1, 0);
  Tcl_CreateObjCommand(interp, 
      "sqlite3_nonblocking_prepare_v2", blocking_prepare_v2_proc, 0, 0);
#endif
  Sqlitetest1_Init(interp);
  Sqlitetest_mutex_Init(interp);


  rc = Tcl_Eval(interp, p->zScript);
  pRes = Tcl_GetObjResult(interp);
  pList = Tcl_NewObj();
  Tcl_IncrRefCount(pList);
  Tcl_IncrRefCount(pRes);








>







121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
  Tcl_CreateObjCommand(interp, 
      "sqlite3_blocking_prepare_v2", blocking_prepare_v2_proc, (void *)1, 0);
  Tcl_CreateObjCommand(interp, 
      "sqlite3_nonblocking_prepare_v2", blocking_prepare_v2_proc, 0, 0);
#endif
  Sqlitetest1_Init(interp);
  Sqlitetest_mutex_Init(interp);
  Sqlite3_Init(interp);

  rc = Tcl_Eval(interp, p->zScript);
  pRes = Tcl_GetObjResult(interp);
  pList = Tcl_NewObj();
  Tcl_IncrRefCount(pList);
  Tcl_IncrRefCount(pRes);

Changes to src/wal.c.
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
  if( pLog->isLocked ){
    assert( pLog->isLocked==LOG_REGION_A || pLog->isLocked==LOG_REGION_D );
    logLockRegion(pLog, pLog->isLocked, LOG_UNLOCK);
  }
  pLog->isLocked = 0;
}

/* 
** Read a page from the log, if it is present. 
*/
int sqlite3WalRead(Log *pLog, Pgno pgno, int *pInLog, u8 *pOut){
  LogSummary *pSummary = pLog->pSummary;
  u32 iRead = 0;
  u32 *aData; 
  int iFrame = (pLog->hdr.iLastPg & 0xFFFFFF00);







|







1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
  if( pLog->isLocked ){
    assert( pLog->isLocked==LOG_REGION_A || pLog->isLocked==LOG_REGION_D );
    logLockRegion(pLog, pLog->isLocked, LOG_UNLOCK);
  }
  pLog->isLocked = 0;
}

/*
** Read a page from the log, if it is present. 
*/
int sqlite3WalRead(Log *pLog, Pgno pgno, int *pInLog, u8 *pOut){
  LogSummary *pSummary = pLog->pSummary;
  u32 iRead = 0;
  u32 *aData; 
  int iFrame = (pLog->hdr.iLastPg & 0xFFFFFF00);
Changes to test/lock_common.tcl.
41
42
43
44
45
46
47





















48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
    }
    if {[eof $chan]} {
      return "ERROR: Child process hung up"
    }
    append r $line
  }
}






















# Write the main loop for the child testfixture processes into file
# tf_main.tcl. The parent (this script) interacts with the child processes
# via a two way pipe. The parent writes a script to the stdin of the child
# process, followed by the word "OVER" on a line of its own. The child
# process evaluates the script and writes the results to stdout, followed
# by an "OVER" of its own.
set f [open tf_main.tcl w]
puts $f {
  set l [open log w]
  set script ""
  while {![eof stdin]} {
    flush stdout
    set line [gets stdin]
    puts $l "READ $line"
    if { $line == "OVER" } {
      catch {eval $script} result
      puts $result
      puts $l "WRITE $result"
      puts OVER
      puts $l "WRITE OVER"
      flush stdout
      set script ""
    } else {
      append script $line
      append script " ; "
    }
  }
  close $l
}
close $f







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
















|








|





41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
    }
    if {[eof $chan]} {
      return "ERROR: Child process hung up"
    }
    append r $line
  }
}

proc testfixture_nb_cb {varname chan} {
  set line [gets $chan]
  if { $line == "OVER" } {
    set $varname $::tfnb($chan)
    unset ::tfnb($chan)
    close $chan
  } else {
    append ::tfnb($chan) $line
  }
}

proc testfixture_nb {varname cmd} {
  set chan [launch_testfixture]
  set ::tfnb($chan) ""
  fconfigure $chan -blocking 0 -buffering none
  puts $chan $cmd
  puts $chan OVER
  fileevent $chan readable [list testfixture_nb_cb $varname $chan]
  return ""
}

# Write the main loop for the child testfixture processes into file
# tf_main.tcl. The parent (this script) interacts with the child processes
# via a two way pipe. The parent writes a script to the stdin of the child
# process, followed by the word "OVER" on a line of its own. The child
# process evaluates the script and writes the results to stdout, followed
# by an "OVER" of its own.
set f [open tf_main.tcl w]
puts $f {
  set l [open log w]
  set script ""
  while {![eof stdin]} {
    flush stdout
    set line [gets stdin]
    puts $l "READ $line"
    if { $line == "OVER" } {
      set rc [catch {eval $script} result]
      puts $result
      puts $l "WRITE $result"
      puts OVER
      puts $l "WRITE OVER"
      flush stdout
      set script ""
    } else {
      append script $line
      append script "\n"
    }
  }
  close $l
}
close $f
Changes to test/walthread.test.
12
13
14
15
16
17
18

19
20
21
22
23
24
25
26
27
28






29







30
31


32
33
34

35
36











37
38
39
40


41


42

43





44
45
46


47



48
49





50
51

52





































































53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100




101
102
103
104
105

106





107
108
109
110

111
112
113
114
115
116
117


118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169




170
171
172
173
174
175
176
177
178
179
180
181
182
183
184

185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# focus of this file is testing the operation of the library in
# "PRAGMA journal_mode=WAL" mode with multiple threads.
#

set testdir [file dirname $argv0]

source $testdir/tester.tcl

if {[run_thread_tests]==0} { finish_test ; return }

set sqlite_walsummary_mmap_incr 64

#--------------------------------------------------------------------------
# Initialize the database used for the multi-thread test.
#
do_test walthread-1.1 {
  execsql {
    PRAGMA journal_mode = WAL;






    PRAGMA lock_status;







    CREATE TABLE t1(x PRIMARY KEY);
    PRAGMA lock_status;


    INSERT INTO t1 VALUES(randomblob(100));
    INSERT INTO t1 VALUES(randomblob(100));
    INSERT INTO t1 SELECT md5sum(x) FROM t1;

  }
} {wal main unlocked temp closed main shared temp closed}











do_test walthread-1.2 {
  execsql {
    SELECT (SELECT count(*) FROM t1), (
      SELECT md5sum(x) FROM t1 WHERE oid != (SELECT max(oid) FROM t1)


    ) == (


      SELECT x FROM t1 WHERE oid = (SELECT max(oid) FROM t1)

    )





  }
} {3 1}
do_test walthread-1.3 {


  execsql { PRAGMA integrity_check } 



} {ok}
do_test walthread-1.4 {





  execsql { PRAGMA lock_status } 
} {main shared temp unknown}







































































#--------------------------------------------------------------------------
# Start N threads. Each thread performs both read and write transactions.
# Each read transaction consists of:
#
#   1) Reading the md5sum of all but the last table row,
#   2) Running integrity check.
#   3) Reading the value stored in the last table row,
#   4) Check that the values read in steps 1 and 3 are the same, and that
#      the md5sum of all but the last table row has not changed.
#
# Each write transaction consists of:
#
#   1) Modifying the contents of t1 (inserting, updating, deleting rows).
#   2) Appending a new row to the table containing the md5sum() of all
#      rows in the table.
#
# Each of the N threads runs N read transactions followed by a single write
# transaction in a loop as fast as possible.
#
# Ther is also a single checkpointer thread. It runs the following loop:
#
#   1) Execute "CHECKPOINT main 32 -1 1"
#   2) Sleep for 500 ms.
#
set thread_program {
  proc rest {ms} {
    set ::rest 0
    after $ms {set ::rest 1}
    vwait ::rest
  }


  proc dosql {DB sql} {
    set res ""
    set stmt [sqlite3_prepare_v2 $DB $sql -1 dummy_tail]
    set rc [sqlite3_step $stmt]
    if {$rc eq "SQLITE_ROW"} {
      set res [sqlite3_column_text $stmt 0]
    }
    set rc [sqlite3_finalize $stmt]

    if {$rc ne "SQLITE_OK"} {
      error "$rc: [sqlite3_errmsg $DB]"
    }
    return $res
  }

  proc read_transaction {DB} {
    dosql $DB BEGIN





    set md5_1 [dosql $DB {
      SELECT md5sum(x) FROM t1 WHERE rowid != (SELECT max(rowid) FROM t1)
    }]
    set check [dosql $DB { PRAGMA integrity_check }]

    set md5_2 [dosql $DB { 





      SELECT x FROM t1 WHERE rowid = (SELECT max(rowid) FROM t1)
    }]
    set md5_3 [dosql $DB {
      SELECT md5sum(x) FROM t1 WHERE rowid != (SELECT max(rowid) FROM t1)

    }]

    dosql $DB COMMIT

    if {$check ne "ok" 
     || $md5_1 ne $md5_2
     || $md5_2 ne $md5_3


    } {
      error "Failed read transaction $check $md5_1 $md5_2 $md5_3"
    }
  }

  proc write_transaction {DB} {
    dosql $DB BEGIN
    dosql $DB "INSERT INTO t1 VALUES(randomblob(100))"
    dosql $DB "INSERT INTO t1 VALUES(randomblob(100))"
    dosql $DB "INSERT INTO t1 SELECT md5sum(x) FROM t1"
    dosql $DB COMMIT
  }

  proc checkpointer {DB} {
    while { !$::finished } {
      dosql $DB "PRAGMA checkpoint"
      rest 1000
    }
  }

  proc worker {DB N} {
    set j 0
    while { !$::finished } {
      for {set i 0} {$i < $N} {incr i} { read_transaction $DB }
      write_transaction $DB
      rest 1
    }
  }

  set ::finished 0
  after [expr $seconds*1000] {set ::finished 1}

  set ::DB [sqlthread open test.db]
  dosql $::DB { PRAGMA journal_mode = WAL }


  set rc [catch {
    if {$role eq "worker"} { worker $DB $N }
    if {$role eq "checkpointer"} { checkpointer $DB }
  } msg]

  sqlite3_close $::DB

  if {$rc==0} { set msg OK } 
  set msg
}

set NTHREAD 6
set SECONDS 30

#set prg "set N $NTHREAD ; set seconds $SECONDS"
set prg "set N 1 ; set seconds $SECONDS"





array unset finished
for {set i 0} {$i < $NTHREAD} {incr i} {
  thread_spawn finished($i) {set role worker} $prg $thread_program
}
thread_spawn finished(C) {set role checkpointer} $prg $thread_program
#set finished(C) 1

puts "... test runs for approximately $SECONDS seconds ..."
for {set i 0} {$i < $::NTHREAD} {incr i} {
  if {![info exists finished($i)]} {
    vwait finished($i)
  }
  do_test walthread-2.$i {
    set ::finished($i)

  } OK
}
do_test walthread-2.C {
  if {![info exists finished(C)]} { vwait finished(C) }
  set ::finished(C)
} OK

set logsize 0

set rows    [execsql { SELECT count(*) FROM t1 }]
catch { set logsize [expr [file size test.db-wal] / 1024] }
set dbsize  [expr [file size test.db] / 1024]

puts "rows=$rows db=${dbsize}K log=${logsize}K"

finish_test









>




|
|

|
|
|
>
>
>
>
>
>
|
>
>
>
>
>
>
>
|
|
>
>
|
<
<
>
|
|
>
>
>
>
>
>
>
>
>
>
>
|
|
|
<
>
>
|
>
>
|
>
|
>
>
>
>
>
|
|
|
>
>
|
>
>
>
|
|
>
>
>
>
>
|
|
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

|
|


















|


<
<
<
<
<
|
|
>
|
<
<
<
<
<
|
<
|
<
|
<
|
<
|
<
<
>
>
>
>
|
<
<
<
<
>
|
>
>
>
>
>
|
<
<
|
>
|
|
<
|
|
<
<
>
>
|
|
|
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
<
|
|
<
<
|
<
<
|
<
<
|
|
<
<
<
<
|
<
|
<
<
|
|
|
<
|
<
<
>
>
>
>
|
|
<
<
|
|
|
|
<
<
<
<
<
<
<
>
|
<
<
<
<
|
|
|
|
<
<
<
|
<


<

12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48


49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191





192
193
194
195





196

197

198

199

200


201
202
203
204
205




206
207
208
209
210
211
212
213


214
215
216
217

218
219


220
221
222
223
224
225
















226



227
228


229


230


231
232




233

234


235
236
237

238


239
240
241
242
243
244


245
246
247
248







249
250




251
252
253
254



255

256
257

258
# focus of this file is testing the operation of the library in
# "PRAGMA journal_mode=WAL" mode with multiple threads.
#

set testdir [file dirname $argv0]

source $testdir/tester.tcl
source $testdir/lock_common.tcl
if {[run_thread_tests]==0} { finish_test ; return }

set sqlite_walsummary_mmap_incr 64

# The number of threads to start. And the amount of time to run the test
# for. Respectively.
#
set NTHREAD 10
set SECONDS 5

# The parameter is the name of a variable in the callers context. The
# variable may or may not exist when this command is invoked.
#
# If the variable does exist, its value is returned. Otherwise, this
# command uses [vwait] to wait until it is set, then returns the value.
# In other words, this is a version of the [set VARNAME] command that
# blocks until a variable exists.
#
proc wait_for_var {varname} {
  if {0==[uplevel [list info exists $varname]]} {
    uplevel [list vwait $varname]
  }
  uplevel [list set $varname]
}

proc lshift {lvar} {
  upvar $lvar L
  set ret [lindex $L 0]
  set L [lrange $L 1 end]


  return $ret
}


#-------------------------------------------------------------------------
#   do_thread_test TESTNAME OPTIONS...
# 
# where OPTIONS are: 
#
#   -seconds   SECONDS                How many seconds to run the test for
#   -init      SCRIPT                 Script to run before test.
#   -thread    NAME COUNT SCRIPT      Scripts to run in threads (or processes).
#   -processes BOOLEAN                True to use processes instead of threads.
#
proc do_thread_test {args} {
  set A $args


  set P(testname) [lshift A]
  set P(seconds) 5
  set P(init) ""
  set P(threads) [list]
  set P(processes) 0

  unset -nocomplain ::done

  while {[llength $A]>0} {
    set a [lshift A]
    switch -glob -- $a {
      -seconds {
        set P(seconds) [lshift A]
      }

      -init {
        set P(init) [lshift A]
      }

      -processes {
        set P(processes) [lshift A]
      }

      -thread {
        set name  [lshift A]
        set count [lshift A]
        set prg   [lshift A]
        lappend P(threads) [list $name $count $prg]
      }

      default {
        error "Unknown option: $a"
      }
    }
  }

  puts "Running $P(testname) for $P(seconds) seconds..."

  catch { db close }
  file delete -force test.db test.db-journal test.db-wal

  sqlite3 db test.db
  eval $P(init)
  db close

  foreach T $P(threads) {
    set name  [lindex $T 0]
    set count [lindex $T 1]
    set prg   [lindex $T 2]

    for {set i 1} {$i <= $count} {incr i} {
      set program [string map [list %TEST% $prg %SECONDS% $P(seconds) %I% $i] {

        set tid %I%

        proc usleep {ms} {
          set ::usleep 0
          after $ms {set ::usleep 1}
          vwait ::usleep
        }
        proc busyhandler {n} { usleep 10 ; return 0 }

        sqlite3 db test.db
        db busy busyhandler
        db eval { SELECT randomblob($tid*5) }

        set ::finished 0
        after [expr %SECONDS% * 1000] {set ::finished 1}
        proc tt_continue {} { expr ($::finished==0) }

        set rc [catch { %TEST% } msg]

        db close
        list $rc $msg
      }]

      if {$P(processes)==0} {
        sqlthread spawn ::done($name,$i) $program
      } else {
        testfixture_nb ::done($name,$i) $program
      }
    }
  }

  set report "  Results:"
  foreach T $P(threads) {
    set name  [lindex $T 0]
    set count [lindex $T 1]
    set prg   [lindex $T 2]

    set reslist [list]
    for {set i 1} {$i <= $count} {incr i} {
      set res [wait_for_var ::done($name,$i)]
      lappend reslist [lindex $res 1]
      do_test $P(testname).$name.$i [list lindex $res 0] 0
    }

    append report "   $name $reslist"
  }
  puts $report
}

#--------------------------------------------------------------------------
# Start NTHREAD threads. Each thread performs both read and write 
# transactions. Each read transaction consists of:
#
#   1) Reading the md5sum of all but the last table row,
#   2) Running integrity check.
#   3) Reading the value stored in the last table row,
#   4) Check that the values read in steps 1 and 3 are the same, and that
#      the md5sum of all but the last table row has not changed.
#
# Each write transaction consists of:
#
#   1) Modifying the contents of t1 (inserting, updating, deleting rows).
#   2) Appending a new row to the table containing the md5sum() of all
#      rows in the table.
#
# Each of the N threads runs N read transactions followed by a single write
# transaction in a loop as fast as possible.
#
# Ther is also a single checkpointer thread. It runs the following loop:
#
#   1) Execute "PRAGMA checkpoint"
#   2) Sleep for 500 ms.
#






foreach {mode name} {
      0 walthread-1-threads 
      1 walthread-1-processes





} {

  do_thread_test $name -processes $mode -seconds $SECONDS -init {

    execsql {

      PRAGMA journal_mode = WAL;

      CREATE TABLE t1(x PRIMARY KEY);


      PRAGMA lock_status;
      INSERT INTO t1 VALUES(randomblob(100));
      INSERT INTO t1 VALUES(randomblob(100));
      INSERT INTO t1 SELECT md5sum(x) FROM t1;
    }




  } -thread main $NTHREAD {
  
    proc read_transaction {} {
      set results [db eval {
        BEGIN;
          PRAGMA integrity_check;
          SELECT md5sum(x) FROM t1 WHERE rowid != (SELECT max(rowid) FROM t1);
          SELECT x FROM t1 WHERE rowid = (SELECT max(rowid) FROM t1);


          SELECT md5sum(x) FROM t1 WHERE rowid != (SELECT max(rowid) FROM t1);
        COMMIT;
      }]
  

      if {[llength $results]!=4
       || [lindex $results 0] != "ok"


       || [lindex $results 1] != [lindex $results 2]
       || [lindex $results 2] != [lindex $results 3]
      } {
        error "Failed read transaction: $results"
      }
    }
















  



    proc write_transaction {} {
      db eval {


        BEGIN;


          INSERT INTO t1 VALUES(randomblob(100));


          INSERT INTO t1 VALUES(randomblob(100));
          INSERT INTO t1 SELECT md5sum(x) FROM t1;




        COMMIT;

      }


    }
  
    set nRun 0

    while {[tt_continue]} {


      read_transaction
      write_transaction 
      usleep 1
      incr nRun
    }
    set nRun


  
  } -thread ckpt 1 {
    set nRun 0
    while {[tt_continue]} {







      db eval "PRAGMA checkpoint"
      usleep 500




      incr nRun
    }
    set nRun
  }



}


finish_test