/ Check-in [25f85f68]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Change walthread.test so that tests can be run with either multiple threads or multiple processes.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | wal
Files: files | file ages | folders
SHA1: 25f85f68723e56c18e44b094d85f67b99912dc86
User & Date: dan 2010-04-28 17:48:44
Context
2010-04-28
17:49
Merge two "wal" leaves. check-in: 13d2d5a6 user: dan tags: wal
17:48
Change walthread.test so that tests can be run with either multiple threads or multiple processes. check-in: 25f85f68 user: dan tags: wal
2010-04-27
18:49
Merge two "wal" leaves. check-in: 8c2d43ba user: dan tags: wal
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/test_thread.c.

54
55
56
57
58
59
60

61
62
63
64
65
66
67
...
120
121
122
123
124
125
126

127
128
129
130
131
132
133
static Tcl_ObjCmdProc sqlthread_proc;
static Tcl_ObjCmdProc clock_seconds_proc;
#if defined(SQLITE_OS_UNIX) && defined(SQLITE_ENABLE_UNLOCK_NOTIFY)
static Tcl_ObjCmdProc blocking_step_proc;
static Tcl_ObjCmdProc blocking_prepare_v2_proc;
#endif
int Sqlitetest1_Init(Tcl_Interp *);


/* Functions from test1.c */
void *sqlite3TestTextToPtr(const char *);
const char *sqlite3TestErrorName(int);
int getDbPointer(Tcl_Interp *, const char *, sqlite3 **);
int sqlite3TestMakePointerStr(Tcl_Interp *, char *, void *);
int sqlite3TestErrCode(Tcl_Interp *, sqlite3 *, int);
................................................................................
  Tcl_CreateObjCommand(interp, 
      "sqlite3_blocking_prepare_v2", blocking_prepare_v2_proc, (void *)1, 0);
  Tcl_CreateObjCommand(interp, 
      "sqlite3_nonblocking_prepare_v2", blocking_prepare_v2_proc, 0, 0);
#endif
  Sqlitetest1_Init(interp);
  Sqlitetest_mutex_Init(interp);


  rc = Tcl_Eval(interp, p->zScript);
  pRes = Tcl_GetObjResult(interp);
  pList = Tcl_NewObj();
  Tcl_IncrRefCount(pList);
  Tcl_IncrRefCount(pRes);








>







 







>







54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
...
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
static Tcl_ObjCmdProc sqlthread_proc;
static Tcl_ObjCmdProc clock_seconds_proc;
#if defined(SQLITE_OS_UNIX) && defined(SQLITE_ENABLE_UNLOCK_NOTIFY)
static Tcl_ObjCmdProc blocking_step_proc;
static Tcl_ObjCmdProc blocking_prepare_v2_proc;
#endif
int Sqlitetest1_Init(Tcl_Interp *);
int Sqlite3_Init(Tcl_Interp *);

/* Functions from test1.c */
void *sqlite3TestTextToPtr(const char *);
const char *sqlite3TestErrorName(int);
int getDbPointer(Tcl_Interp *, const char *, sqlite3 **);
int sqlite3TestMakePointerStr(Tcl_Interp *, char *, void *);
int sqlite3TestErrCode(Tcl_Interp *, sqlite3 *, int);
................................................................................
  Tcl_CreateObjCommand(interp, 
      "sqlite3_blocking_prepare_v2", blocking_prepare_v2_proc, (void *)1, 0);
  Tcl_CreateObjCommand(interp, 
      "sqlite3_nonblocking_prepare_v2", blocking_prepare_v2_proc, 0, 0);
#endif
  Sqlitetest1_Init(interp);
  Sqlitetest_mutex_Init(interp);
  Sqlite3_Init(interp);

  rc = Tcl_Eval(interp, p->zScript);
  pRes = Tcl_GetObjResult(interp);
  pList = Tcl_NewObj();
  Tcl_IncrRefCount(pList);
  Tcl_IncrRefCount(pRes);

Changes to src/wal.c.

1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
  if( pLog->isLocked ){
    assert( pLog->isLocked==LOG_REGION_A || pLog->isLocked==LOG_REGION_D );
    logLockRegion(pLog, pLog->isLocked, LOG_UNLOCK);
  }
  pLog->isLocked = 0;
}

/* 
** Read a page from the log, if it is present. 
*/
int sqlite3WalRead(Log *pLog, Pgno pgno, int *pInLog, u8 *pOut){
  LogSummary *pSummary = pLog->pSummary;
  u32 iRead = 0;
  u32 *aData; 
  int iFrame = (pLog->hdr.iLastPg & 0xFFFFFF00);







|







1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
  if( pLog->isLocked ){
    assert( pLog->isLocked==LOG_REGION_A || pLog->isLocked==LOG_REGION_D );
    logLockRegion(pLog, pLog->isLocked, LOG_UNLOCK);
  }
  pLog->isLocked = 0;
}

/*
** Read a page from the log, if it is present. 
*/
int sqlite3WalRead(Log *pLog, Pgno pgno, int *pInLog, u8 *pOut){
  LogSummary *pSummary = pLog->pSummary;
  u32 iRead = 0;
  u32 *aData; 
  int iFrame = (pLog->hdr.iLastPg & 0xFFFFFF00);

Changes to test/lock_common.tcl.

41
42
43
44
45
46
47





















48
49
50
51
52
53
54
..
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
    }
    if {[eof $chan]} {
      return "ERROR: Child process hung up"
    }
    append r $line
  }
}






















# Write the main loop for the child testfixture processes into file
# tf_main.tcl. The parent (this script) interacts with the child processes
# via a two way pipe. The parent writes a script to the stdin of the child
# process, followed by the word "OVER" on a line of its own. The child
# process evaluates the script and writes the results to stdout, followed
# by an "OVER" of its own.
................................................................................
  set l [open log w]
  set script ""
  while {![eof stdin]} {
    flush stdout
    set line [gets stdin]
    puts $l "READ $line"
    if { $line == "OVER" } {
      catch {eval $script} result
      puts $result
      puts $l "WRITE $result"
      puts OVER
      puts $l "WRITE OVER"
      flush stdout
      set script ""
    } else {
      append script $line
      append script " ; "
    }
  }
  close $l
}
close $f







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|








|





41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
..
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
    }
    if {[eof $chan]} {
      return "ERROR: Child process hung up"
    }
    append r $line
  }
}

proc testfixture_nb_cb {varname chan} {
  set line [gets $chan]
  if { $line == "OVER" } {
    set $varname $::tfnb($chan)
    unset ::tfnb($chan)
    close $chan
  } else {
    append ::tfnb($chan) $line
  }
}

proc testfixture_nb {varname cmd} {
  set chan [launch_testfixture]
  set ::tfnb($chan) ""
  fconfigure $chan -blocking 0 -buffering none
  puts $chan $cmd
  puts $chan OVER
  fileevent $chan readable [list testfixture_nb_cb $varname $chan]
  return ""
}

# Write the main loop for the child testfixture processes into file
# tf_main.tcl. The parent (this script) interacts with the child processes
# via a two way pipe. The parent writes a script to the stdin of the child
# process, followed by the word "OVER" on a line of its own. The child
# process evaluates the script and writes the results to stdout, followed
# by an "OVER" of its own.
................................................................................
  set l [open log w]
  set script ""
  while {![eof stdin]} {
    flush stdout
    set line [gets stdin]
    puts $l "READ $line"
    if { $line == "OVER" } {
      set rc [catch {eval $script} result]
      puts $result
      puts $l "WRITE $result"
      puts OVER
      puts $l "WRITE OVER"
      flush stdout
      set script ""
    } else {
      append script $line
      append script "\n"
    }
  }
  close $l
}
close $f

Changes to test/walthread.test.

12
13
14
15
16
17
18

19
20
21
22





























23
24

25
26
27
28
29
30
31
32
33
34









35
36
37
38
39
40
41
42





43

44
45















46
47
48
49
50
51




52









































































53
54
55
56
57
58
59
60
61
62
..
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89



90
91








92
93
94

95
96
97
98
99

100
101
102

103
104
105
106
107
108
109
110

111
112
113
114
115
116
117




118
119
120
121
122
123

124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

141
142
143


144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164

165
166




167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# focus of this file is testing the operation of the library in
# "PRAGMA journal_mode=WAL" mode with multiple threads.
#

set testdir [file dirname $argv0]

source $testdir/tester.tcl

if {[run_thread_tests]==0} { finish_test ; return }

set sqlite_walsummary_mmap_incr 64






























#--------------------------------------------------------------------------
# Initialize the database used for the multi-thread test.

#
do_test walthread-1.1 {
  execsql {
    PRAGMA journal_mode = WAL;
    PRAGMA lock_status;
    CREATE TABLE t1(x PRIMARY KEY);
    PRAGMA lock_status;
    INSERT INTO t1 VALUES(randomblob(100));
    INSERT INTO t1 VALUES(randomblob(100));
    INSERT INTO t1 SELECT md5sum(x) FROM t1;









  }
} {wal main unlocked temp closed main shared temp closed}
do_test walthread-1.2 {
  execsql {
    SELECT (SELECT count(*) FROM t1), (
      SELECT md5sum(x) FROM t1 WHERE oid != (SELECT max(oid) FROM t1)
    ) == (
      SELECT x FROM t1 WHERE oid = (SELECT max(oid) FROM t1)





    )

  }
} {3 1}















do_test walthread-1.3 {
  execsql { PRAGMA integrity_check } 
} {ok}
do_test walthread-1.4 {
  execsql { PRAGMA lock_status } 
} {main shared temp unknown}














































































#--------------------------------------------------------------------------
# Start N threads. Each thread performs both read and write transactions.
# Each read transaction consists of:
#
#   1) Reading the md5sum of all but the last table row,
#   2) Running integrity check.
#   3) Reading the value stored in the last table row,
#   4) Check that the values read in steps 1 and 3 are the same, and that
#      the md5sum of all but the last table row has not changed.
#
................................................................................
#      rows in the table.
#
# Each of the N threads runs N read transactions followed by a single write
# transaction in a loop as fast as possible.
#
# Ther is also a single checkpointer thread. It runs the following loop:
#
#   1) Execute "CHECKPOINT main 32 -1 1"
#   2) Sleep for 500 ms.
#
set thread_program {
  proc rest {ms} {
    set ::rest 0
    after $ms {set ::rest 1}
    vwait ::rest
  }

  proc dosql {DB sql} {
    set res ""
    set stmt [sqlite3_prepare_v2 $DB $sql -1 dummy_tail]
    set rc [sqlite3_step $stmt]
    if {$rc eq "SQLITE_ROW"} {
      set res [sqlite3_column_text $stmt 0]



    }
    set rc [sqlite3_finalize $stmt]









    if {$rc ne "SQLITE_OK"} {
      error "$rc: [sqlite3_errmsg $DB]"

    }
    return $res
  }

  proc read_transaction {DB} {

    dosql $DB BEGIN

    set md5_1 [dosql $DB {

      SELECT md5sum(x) FROM t1 WHERE rowid != (SELECT max(rowid) FROM t1)
    }]
    set check [dosql $DB { PRAGMA integrity_check }]
    set md5_2 [dosql $DB { 
      SELECT x FROM t1 WHERE rowid = (SELECT max(rowid) FROM t1)
    }]
    set md5_3 [dosql $DB {
      SELECT md5sum(x) FROM t1 WHERE rowid != (SELECT max(rowid) FROM t1)

    }]

    dosql $DB COMMIT

    if {$check ne "ok" 
     || $md5_1 ne $md5_2
     || $md5_2 ne $md5_3




    } {
      error "Failed read transaction $check $md5_1 $md5_2 $md5_3"
    }
  }

  proc write_transaction {DB} {

    dosql $DB BEGIN
    dosql $DB "INSERT INTO t1 VALUES(randomblob(100))"
    dosql $DB "INSERT INTO t1 VALUES(randomblob(100))"
    dosql $DB "INSERT INTO t1 SELECT md5sum(x) FROM t1"
    dosql $DB COMMIT
  }

  proc checkpointer {DB} {
    while { !$::finished } {
      dosql $DB "PRAGMA checkpoint"
      rest 1000
    }
  }

  proc worker {DB N} {
    set j 0
    while { !$::finished } {

      for {set i 0} {$i < $N} {incr i} { read_transaction $DB }
      write_transaction $DB
      rest 1


    }
  }

  set ::finished 0
  after [expr $seconds*1000] {set ::finished 1}

  set ::DB [sqlthread open test.db]
  dosql $::DB { PRAGMA journal_mode = WAL }


  set rc [catch {
    if {$role eq "worker"} { worker $DB $N }
    if {$role eq "checkpointer"} { checkpointer $DB }
  } msg]

  sqlite3_close $::DB

  if {$rc==0} { set msg OK } 
  set msg
}


set NTHREAD 6
set SECONDS 30





#set prg "set N $NTHREAD ; set seconds $SECONDS"
set prg "set N 1 ; set seconds $SECONDS"

array unset finished
for {set i 0} {$i < $NTHREAD} {incr i} {
  thread_spawn finished($i) {set role worker} $prg $thread_program
}
thread_spawn finished(C) {set role checkpointer} $prg $thread_program
#set finished(C) 1

puts "... test runs for approximately $SECONDS seconds ..."
for {set i 0} {$i < $::NTHREAD} {incr i} {
  if {![info exists finished($i)]} {
    vwait finished($i)
  }
  do_test walthread-2.$i {
    set ::finished($i)
  } OK
}
do_test walthread-2.C {
  if {![info exists finished(C)]} { vwait finished(C) }
  set ::finished(C)
} OK

set logsize 0

set rows    [execsql { SELECT count(*) FROM t1 }]
catch { set logsize [expr [file size test.db-wal] / 1024] }
set dbsize  [expr [file size test.db] / 1024]

puts "rows=$rows db=${dbsize}K log=${logsize}K"

finish_test









>




>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
<
>
|
<
<
<
<
<
<
<
<
<
>
>
>
>
>
>
>
>
>
|
<
<
<
<
<
<
<
>
>
>
>
>
|
>
|
<
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
<
<
<
<
<
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

|
|







 







|


<
<
<
<
<
|
<
<
<
<
<
<
<
>
>
>
|
<
>
>
>
>
>
>
>
>
|
<
<
>
|
<
<
<
|
>
|
<
<
>
|
<
<
<
|
<
<
|
>
|
|
<
<
<
<
<
>
>
>
>
|
|
|
|
|
|
>
|
|
|
|
|
|
|
<
<
<
<
|
<
<
<
|
<
>
|
|
<
>
>

<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
<
>
|
<
>
>
>
>
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
<
<
<
|
<



<
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

54
55









56
57
58
59
60
61
62
63
64
65







66
67
68
69
70
71
72
73

74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89





90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
...
182
183
184
185
186
187
188
189
190
191





192







193
194
195
196

197
198
199
200
201
202
203
204
205


206
207



208
209
210


211
212



213


214
215
216
217





218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235




236



237

238
239
240

241
242
243

















244
245

246
247

248
249
250
251
252
























253
254



255

256
257
258

# focus of this file is testing the operation of the library in
# "PRAGMA journal_mode=WAL" mode with multiple threads.
#

set testdir [file dirname $argv0]

source $testdir/tester.tcl
source $testdir/lock_common.tcl
if {[run_thread_tests]==0} { finish_test ; return }

set sqlite_walsummary_mmap_incr 64

# The number of threads to start. And the amount of time to run the test
# for. Respectively.
#
set NTHREAD 10
set SECONDS 5

# The parameter is the name of a variable in the callers context. The
# variable may or may not exist when this command is invoked.
#
# If the variable does exist, its value is returned. Otherwise, this
# command uses [vwait] to wait until it is set, then returns the value.
# In other words, this is a version of the [set VARNAME] command that
# blocks until a variable exists.
#
proc wait_for_var {varname} {
  if {0==[uplevel [list info exists $varname]]} {
    uplevel [list vwait $varname]
  }
  uplevel [list set $varname]
}

proc lshift {lvar} {
  upvar $lvar L
  set ret [lindex $L 0]
  set L [lrange $L 1 end]
  return $ret
}


#-------------------------------------------------------------------------

#   do_thread_test TESTNAME OPTIONS...
# 









# where OPTIONS are: 
#
#   -seconds   SECONDS                How many seconds to run the test for
#   -init      SCRIPT                 Script to run before test.
#   -thread    NAME COUNT SCRIPT      Scripts to run in threads (or processes).
#   -processes BOOLEAN                True to use processes instead of threads.
#
proc do_thread_test {args} {
  set A $args








  set P(testname) [lshift A]
  set P(seconds) 5
  set P(init) ""
  set P(threads) [list]
  set P(processes) 0

  unset -nocomplain ::done


  while {[llength $A]>0} {
    set a [lshift A]
    switch -glob -- $a {
      -seconds {
        set P(seconds) [lshift A]
      }

      -init {
        set P(init) [lshift A]
      }

      -processes {
        set P(processes) [lshift A]
      }

      -thread {





        set name  [lshift A]
        set count [lshift A]
        set prg   [lshift A]
        lappend P(threads) [list $name $count $prg]
      }

      default {
        error "Unknown option: $a"
      }
    }
  }

  puts "Running $P(testname) for $P(seconds) seconds..."

  catch { db close }
  file delete -force test.db test.db-journal test.db-wal

  sqlite3 db test.db
  eval $P(init)
  db close

  foreach T $P(threads) {
    set name  [lindex $T 0]
    set count [lindex $T 1]
    set prg   [lindex $T 2]

    for {set i 1} {$i <= $count} {incr i} {
      set program [string map [list %TEST% $prg %SECONDS% $P(seconds) %I% $i] {

        set tid %I%

        proc usleep {ms} {
          set ::usleep 0
          after $ms {set ::usleep 1}
          vwait ::usleep
        }
        proc busyhandler {n} { usleep 10 ; return 0 }

        sqlite3 db test.db
        db busy busyhandler
        db eval { SELECT randomblob($tid*5) }

        set ::finished 0
        after [expr %SECONDS% * 1000] {set ::finished 1}
        proc tt_continue {} { expr ($::finished==0) }

        set rc [catch { %TEST% } msg]

        db close
        list $rc $msg
      }]

      if {$P(processes)==0} {
        sqlthread spawn ::done($name,$i) $program
      } else {
        testfixture_nb ::done($name,$i) $program
      }
    }
  }

  set report "  Results:"
  foreach T $P(threads) {
    set name  [lindex $T 0]
    set count [lindex $T 1]
    set prg   [lindex $T 2]

    set reslist [list]
    for {set i 1} {$i <= $count} {incr i} {
      set res [wait_for_var ::done($name,$i)]
      lappend reslist [lindex $res 1]
      do_test $P(testname).$name.$i [list lindex $res 0] 0
    }

    append report "   $name $reslist"
  }
  puts $report
}

#--------------------------------------------------------------------------
# Start NTHREAD threads. Each thread performs both read and write 
# transactions. Each read transaction consists of:
#
#   1) Reading the md5sum of all but the last table row,
#   2) Running integrity check.
#   3) Reading the value stored in the last table row,
#   4) Check that the values read in steps 1 and 3 are the same, and that
#      the md5sum of all but the last table row has not changed.
#
................................................................................
#      rows in the table.
#
# Each of the N threads runs N read transactions followed by a single write
# transaction in a loop as fast as possible.
#
# Ther is also a single checkpointer thread. It runs the following loop:
#
#   1) Execute "PRAGMA checkpoint"
#   2) Sleep for 500 ms.
#













foreach {mode name} {
      0 walthread-1-threads 
      1 walthread-1-processes
} {

  do_thread_test $name -processes $mode -seconds $SECONDS -init {
    execsql {
      PRAGMA journal_mode = WAL;
      CREATE TABLE t1(x PRIMARY KEY);
      PRAGMA lock_status;
      INSERT INTO t1 VALUES(randomblob(100));
      INSERT INTO t1 VALUES(randomblob(100));
      INSERT INTO t1 SELECT md5sum(x) FROM t1;
    }


  } -thread main $NTHREAD {
  



    proc read_transaction {} {
      set results [db eval {
        BEGIN;


          PRAGMA integrity_check;
          SELECT md5sum(x) FROM t1 WHERE rowid != (SELECT max(rowid) FROM t1);



          SELECT x FROM t1 WHERE rowid = (SELECT max(rowid) FROM t1);


          SELECT md5sum(x) FROM t1 WHERE rowid != (SELECT max(rowid) FROM t1);
        COMMIT;
      }]
  





      if {[llength $results]!=4
       || [lindex $results 0] != "ok"
       || [lindex $results 1] != [lindex $results 2]
       || [lindex $results 2] != [lindex $results 3]
      } {
        error "Failed read transaction: $results"
      }
    }
  
    proc write_transaction {} {
      db eval {
        BEGIN;
          INSERT INTO t1 VALUES(randomblob(100));
          INSERT INTO t1 VALUES(randomblob(100));
          INSERT INTO t1 SELECT md5sum(x) FROM t1;
        COMMIT;
      }
    }




  



    set nRun 0

    while {[tt_continue]} {
      read_transaction
      write_transaction 

      usleep 1
      incr nRun
    }

















    set nRun
  

  } -thread ckpt 1 {
    set nRun 0

    while {[tt_continue]} {
      db eval "PRAGMA checkpoint"
      usleep 500
      incr nRun
    }
























    set nRun
  }



}


finish_test