/ Check-in [445bfe97]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add sqlite3_changeset_apply_v2() and apply_v2_strm() to the sessions module.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | sessions-rebase
Files: files | file ages | folders
SHA3-256:445bfe977d9f3a891e08ef33237862ed047fe83e134ef3ed8b47ee0f5abd8cd6
User & Date: dan 2018-03-13 20:31:23
Context
2018-03-14
21:06
Add largely untested APIs for rebasing changesets. check-in: 39915b68 user: dan tags: sessions-rebase
2018-03-13
20:31
Add sqlite3_changeset_apply_v2() and apply_v2_strm() to the sessions module. check-in: 445bfe97 user: dan tags: sessions-rebase
2018-03-12
21:09
Fix a typo causing SQLITE_LOG_CACHE_SPILL builds to fail. check-in: 0171d4a7 user: dan tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Added ext/session/sessionrebase.test.



























































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# 2018 March 14
#
# The author disclaims copyright to this source code.  In place of
# a legal notice, here is a blessing:
#
#    May you do good and not evil.
#    May you find forgiveness for yourself and forgive others.
#    May you share freely, never taking more than you give.
#
#***********************************************************************
# This file implements regression tests for SQLite library.
#

if {![info exists testdir]} {
  set testdir [file join [file dirname [info script]] .. .. test]
} 
source [file join [file dirname [info script]] session_common.tcl]
source $testdir/tester.tcl
ifcapable !session {finish_test; return}

set testprefix sessionrebase

set ::lConflict [list]
proc xConflict {args} {
  set res [lindex $::lConflict 0]
  set ::lConflict [lrange $::lConflict 1 end]
  return $res
}

#-------------------------------------------------------------------------
# The following test cases - 1.* - test that the rebase blobs output by
# sqlite3_changeset_apply_v2 look correct in some simple cases. The blob
# is itself a changeset, containing records determined as follows:
#
#   * For each conflict resolved with REPLACE, the rebase blob contains
#     a DELETE record. All fields other than the PK fields are undefined.
#
#   * For each conflict resolved with OMIT, the rebase blob contains an
#     INSERT record. For an INSERT or UPDATE operation, the indirect flag
#     is clear and all updated fields are defined. For a DELETE operation,
#     the indirect flag is set and all non-PK fields left undefined.
#
proc do_apply_v2_test {tn sql modsql conflict_handler res} {
  
  execsql BEGIN
  sqlite3session S db main
  S attach *
  execsql $sql
  set changeset [S changeset]
  S delete
  execsql ROLLBACK

  execsql BEGIN
  execsql $modsql
  set ::lConflict $conflict_handler
  set blob [sqlite3changeset_apply_v2 db $changeset xConflict]
  execsql ROLLBACK

  uplevel [list do_test $tn [list changeset_to_list $blob] [list {*}$res]]
}

do_execsql_test 1.0 {
  CREATE TABLE t1(a INTEGER PRIMARY KEY, b);
  INSERT INTO t1 VALUES(1, 'value A');
}

do_apply_v2_test 1.1.1 {
  UPDATE t1 SET b = 'value B' WHERE a=1;
} {
  UPDATE t1 SET b = 'value C' WHERE a=1;
} {
  OMIT
} {
  {INSERT t1 0 X. {} {i 1 t {value B}}}
}
do_apply_v2_test 1.1.2 {
  UPDATE t1 SET b = 'value B' WHERE a=1;
} {
  UPDATE t1 SET b = 'value C' WHERE a=1;
} {
  REPLACE
} {
  {DELETE t1 0 X. {i 1 {} {}} {}}
}

do_apply_v2_test 1.2.1 {
  INSERT INTO t1 VALUES(2, 'first');
} {
  INSERT INTO t1 VALUES(2, 'second');
} {
  OMIT
} {
  {INSERT t1 0 X. {} {i 2 t first}}
}
do_apply_v2_test 1.2.2 {
  INSERT INTO t1 VALUES(2, 'first');
} {
  INSERT INTO t1 VALUES(2, 'second');
} {
  REPLACE
} {
  {DELETE t1 0 X. {i 2 {} {}} {}}
}

do_apply_v2_test 1.3.1 {
  DELETE FROM t1 WHERE a=1;
} {
  UPDATE t1 SET b='value D' WHERE a=1;
} {
  OMIT
} {
  {INSERT t1 1 X. {} {i 1 {} {}}}
}
do_apply_v2_test 1.3.2 {
  DELETE FROM t1 WHERE a=1;
} {
  UPDATE t1 SET b='value D' WHERE a=1;
} {
  REPLACE
} {
  {DELETE t1 0 X. {i 1 {} {}} {}}
}


finish_test

Changes to ext/session/sqlite3session.c.

3406
3407
3408
3409
3410
3411
3412


3413
3414
3415
3416
3417
3418
3419
....
3786
3787
3788
3789
3790
3791
3792






















































3793
3794
3795
3796
3797
3798
3799
....
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
....
3888
3889
3890
3891
3892
3893
3894



3895
3896
3897
3898
3899
3900
3901
....
4063
4064
4065
4066
4067
4068
4069
4070
4071
4072
4073
4074
4075
4076
4077
4078
4079
4080
4081
4082
4083
4084
4085
4086
4087
4088
4089
4090
4091
4092
4093
4094
4095
4096
4097
4098
4099
4100
4101
4102
4103
4104
4105

4106
4107
4108
4109
4110
4111
4112
....
4174
4175
4176
4177
4178
4179
4180
4181

4182
4183
4184
4185
4186
4187
4188
....
4215
4216
4217
4218
4219
4220
4221

4222
4223
4224
4225
4226
4227
4228
....
4324
4325
4326
4327
4328
4329
4330





4331
4332
4333
4334
4335
4336

4337
4338
4339


























4340
4341
4342
4343
4344
4345
4346
....
4354
4355
4356
4357
4358
4359
4360
4361
4362
4363
4364
4365
4366

4367
4368
4369
4370
4371
4372
4373

























4374
4375
4376
4377
4378
4379
4380
....
4382
4383
4384
4385
4386
4387
4388
4389
4390
4391
4392
4393
4394

4395
4396
4397
4398
4399
4400
4401
  sqlite3_stmt *pSelect;          /* SELECT statement */
  int nCol;                       /* Size of azCol[] and abPK[] arrays */
  const char **azCol;             /* Array of column names */
  u8 *abPK;                       /* Boolean array - true if column is in PK */
  int bStat1;                     /* True if table is sqlite_stat1 */
  int bDeferConstraints;          /* True to defer constraints */
  SessionBuffer constraints;      /* Deferred constraints are stored here */


};

/*
** Formulate a statement to DELETE a row from database db. Assuming a table
** structure like this:
**
**     CREATE TABLE x(a, b, c, d, PRIMARY KEY(a, c));
................................................................................
  if( rc==SQLITE_OK ){
    rc = sqlite3_step(pSelect);
    if( rc!=SQLITE_ROW ) rc = sqlite3_reset(pSelect);
  }

  return rc;
}























































/*
** Invoke the conflict handler for the change that the changeset iterator
** currently points to.
**
** Argument eType must be either CHANGESET_DATA or CHANGESET_CONFLICT.
** If argument pbReplace is NULL, then the type of conflict handler invoked
................................................................................
  }else if( rc==SQLITE_OK ){
    if( p->bDeferConstraints && eType==SQLITE_CHANGESET_CONFLICT ){
      /* Instead of invoking the conflict handler, append the change blob
      ** to the SessionApplyCtx.constraints buffer. */
      u8 *aBlob = &pIter->in.aData[pIter->in.iCurrent];
      int nBlob = pIter->in.iNext - pIter->in.iCurrent;
      sessionAppendBlob(&p->constraints, aBlob, nBlob, &rc);
      res = SQLITE_CHANGESET_OMIT;
    }else{
      /* No other row with the new.* primary key. */
      res = xConflict(pCtx, eType+1, pIter);
      if( res==SQLITE_CHANGESET_REPLACE ) rc = SQLITE_MISUSE;
    }
  }

................................................................................
        rc = SQLITE_ABORT;
        break;

      default:
        rc = SQLITE_MISUSE;
        break;
    }



  }

  return rc;
}

/*
** Attempt to apply the change that the iterator passed as the first argument
................................................................................
  void *pCtx                      /* First argument passed to xConflict */
){
  int bReplace = 0;
  int bRetry = 0;
  int rc;

  rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, &bReplace, &bRetry);
  assert( rc==SQLITE_OK || (bRetry==0 && bReplace==0) );

  /* If the bRetry flag is set, the change has not been applied due to an
  ** SQLITE_CHANGESET_DATA problem (i.e. this is an UPDATE or DELETE and
  ** a row with the correct PK is present in the db, but one or more other
  ** fields do not contain the expected values) and the conflict handler 
  ** returned SQLITE_CHANGESET_REPLACE. In this case retry the operation,
  ** but pass NULL as the final argument so that sessionApplyOneOp() ignores
  ** the SQLITE_CHANGESET_DATA problem.  */
  if( bRetry ){
    assert( pIter->op==SQLITE_UPDATE || pIter->op==SQLITE_DELETE );
    rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, 0, 0);
  }

  /* If the bReplace flag is set, the change is an INSERT that has not
  ** been performed because the database already contains a row with the
  ** specified primary key and the conflict handler returned
  ** SQLITE_CHANGESET_REPLACE. In this case remove the conflicting row
  ** before reattempting the INSERT.  */
  else if( bReplace ){
    assert( pIter->op==SQLITE_INSERT );
    rc = sqlite3_exec(db, "SAVEPOINT replace_op", 0, 0, 0);
    if( rc==SQLITE_OK ){
      rc = sessionBindRow(pIter, 
          sqlite3changeset_new, pApply->nCol, pApply->abPK, pApply->pDelete);
      sqlite3_bind_int(pApply->pDelete, pApply->nCol+1, 1);
    }
    if( rc==SQLITE_OK ){
      sqlite3_step(pApply->pDelete);
      rc = sqlite3_reset(pApply->pDelete);
    }
    if( rc==SQLITE_OK ){
      rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, 0, 0);
    }
    if( rc==SQLITE_OK ){
      rc = sqlite3_exec(db, "RELEASE replace_op", 0, 0, 0);

    }
  }

  return rc;
}

/*
................................................................................
    const char *zTab              /* Table name */
  ),
  int(*xConflict)(
    void *pCtx,                   /* Copy of fifth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */

){
  int schemaMismatch = 0;
  int rc;                         /* Return code */
  const char *zTab = 0;           /* Name of current table */
  int nTab = 0;                   /* Result of sqlite3Strlen30(zTab) */
  SessionApplyCtx sApply;         /* changeset_apply() context object */
  int bPatchset;
................................................................................
      sqlite3_finalize(sApply.pDelete);
      sqlite3_finalize(sApply.pUpdate); 
      sqlite3_finalize(sApply.pInsert);
      sqlite3_finalize(sApply.pSelect);
      memset(&sApply, 0, sizeof(sApply));
      sApply.db = db;
      sApply.bDeferConstraints = 1;


      /* If an xFilter() callback was specified, invoke it now. If the 
      ** xFilter callback returns zero, skip this table. If it returns
      ** non-zero, proceed. */
      schemaMismatch = (xFilter && (0==xFilter(pCtx, zNew)));
      if( schemaMismatch ){
        zTab = sqlite3_mprintf("%s", zNew);
................................................................................
  if( rc==SQLITE_OK ){
    rc = sqlite3_exec(db, "RELEASE changeset_apply", 0, 0, 0);
  }else{
    sqlite3_exec(db, "ROLLBACK TO changeset_apply", 0, 0, 0);
    sqlite3_exec(db, "RELEASE changeset_apply", 0, 0, 0);
  }






  sqlite3_finalize(sApply.pInsert);
  sqlite3_finalize(sApply.pDelete);
  sqlite3_finalize(sApply.pUpdate);
  sqlite3_finalize(sApply.pSelect);
  sqlite3_free((char*)sApply.azCol);  /* cast works around VC++ bug */
  sqlite3_free((char*)sApply.constraints.aBuf);

  sqlite3_mutex_leave(sqlite3_db_mutex(db));
  return rc;
}



























/*
** Apply the changeset passed via pChangeset/nChangeset to the main database
** attached to handle "db". Invoke the supplied conflict handler callback
** to resolve any conflicts encountered while applying the change.
*/
int sqlite3changeset_apply(
................................................................................
  int(*xConflict)(
    void *pCtx,                   /* Copy of fifth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
){
  sqlite3_changeset_iter *pIter;  /* Iterator to skip through changeset */  
  int rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
  if( rc==SQLITE_OK ){
    rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx);
  }
  return rc;

}

/*
** Apply the changeset passed via xInput/pIn to the main database
** attached to handle "db". Invoke the supplied conflict handler callback
** to resolve any conflicts encountered while applying the change.
*/

























int sqlite3changeset_apply_strm(
  sqlite3 *db,                    /* Apply change to "main" db of this handle */
  int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
  void *pIn,                                          /* First arg for xInput */
  int(*xFilter)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    const char *zTab              /* Table name */
................................................................................
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
){
  sqlite3_changeset_iter *pIter;  /* Iterator to skip through changeset */  
  int rc = sqlite3changeset_start_strm(&pIter, xInput, pIn);
  if( rc==SQLITE_OK ){
    rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx);
  }
  return rc;

}

/*
** sqlite3_changegroup handle.
*/
struct sqlite3_changegroup {
  int rc;                         /* Error code */







>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|







 







>
>
>







 







|
<
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>







 







|
>







 







>







 







>
>
>
>
>






>



>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







<
|
<
|
<
<
>







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







<
|
<
|
<
<
>







3406
3407
3408
3409
3410
3411
3412
3413
3414
3415
3416
3417
3418
3419
3420
3421
....
3788
3789
3790
3791
3792
3793
3794
3795
3796
3797
3798
3799
3800
3801
3802
3803
3804
3805
3806
3807
3808
3809
3810
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
....
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927
3928
3929
3930
3931
3932
....
3944
3945
3946
3947
3948
3949
3950
3951
3952
3953
3954
3955
3956
3957
3958
3959
3960
....
4122
4123
4124
4125
4126
4127
4128
4129

4130
4131
4132
4133
4134
4135
4136
4137
4138
4139
4140
4141
4142
4143
4144
4145
4146
4147
4148
4149
4150
4151
4152
4153
4154
4155
4156
4157
4158
4159
4160
4161
4162
4163
4164
4165
4166
4167
4168
4169
4170
4171
....
4233
4234
4235
4236
4237
4238
4239
4240
4241
4242
4243
4244
4245
4246
4247
4248
....
4275
4276
4277
4278
4279
4280
4281
4282
4283
4284
4285
4286
4287
4288
4289
....
4385
4386
4387
4388
4389
4390
4391
4392
4393
4394
4395
4396
4397
4398
4399
4400
4401
4402
4403
4404
4405
4406
4407
4408
4409
4410
4411
4412
4413
4414
4415
4416
4417
4418
4419
4420
4421
4422
4423
4424
4425
4426
4427
4428
4429
4430
4431
4432
4433
4434
4435
4436
4437
4438
4439
....
4447
4448
4449
4450
4451
4452
4453

4454

4455


4456
4457
4458
4459
4460
4461
4462
4463
4464
4465
4466
4467
4468
4469
4470
4471
4472
4473
4474
4475
4476
4477
4478
4479
4480
4481
4482
4483
4484
4485
4486
4487
4488
4489
4490
4491
4492
4493
4494
4495
....
4497
4498
4499
4500
4501
4502
4503

4504

4505


4506
4507
4508
4509
4510
4511
4512
4513
  sqlite3_stmt *pSelect;          /* SELECT statement */
  int nCol;                       /* Size of azCol[] and abPK[] arrays */
  const char **azCol;             /* Array of column names */
  u8 *abPK;                       /* Boolean array - true if column is in PK */
  int bStat1;                     /* True if table is sqlite_stat1 */
  int bDeferConstraints;          /* True to defer constraints */
  SessionBuffer constraints;      /* Deferred constraints are stored here */
  SessionBuffer rebase;           /* Rebase information (if any) here */
  int bRebaseStarted;             /* If table header is already in rebase */
};

/*
** Formulate a statement to DELETE a row from database db. Assuming a table
** structure like this:
**
**     CREATE TABLE x(a, b, c, d, PRIMARY KEY(a, c));
................................................................................
  if( rc==SQLITE_OK ){
    rc = sqlite3_step(pSelect);
    if( rc!=SQLITE_ROW ) rc = sqlite3_reset(pSelect);
  }

  return rc;
}

static int sessionRebaseAdd(
  SessionApplyCtx *p, 
  int eType, 
  sqlite3_changeset_iter *pIter
){
  int rc = SQLITE_OK;
  int i;
  int eOp = pIter->op;
  if( p->bRebaseStarted==0 ){
    /* Append a table-header to the rebase buffer */
    const char *zTab = pIter->zTab;
    sessionAppendByte(&p->rebase, 'T', &rc);
    sessionAppendVarint(&p->rebase, p->nCol, &rc);
    sessionAppendBlob(&p->rebase, p->abPK, p->nCol, &rc);
    sessionAppendBlob(&p->rebase, (u8*)zTab, (int)strlen(zTab)+1, &rc);
    p->bRebaseStarted = 1;
  }

  assert( eType==SQLITE_CHANGESET_REPLACE||eType==SQLITE_CHANGESET_OMIT );
  assert( eOp==SQLITE_DELETE || eOp==SQLITE_INSERT || eOp==SQLITE_UPDATE );

  if( eType==SQLITE_CHANGESET_REPLACE ){
    sessionAppendByte(&p->rebase, SQLITE_DELETE, &rc);
    sessionAppendByte(&p->rebase, 0, &rc);
    for(i=0; i<p->nCol; i++){
      if( p->abPK[i]==0 ){
        sessionAppendByte(&p->rebase, 0, &rc);
      }else{
        sqlite3_value *pVal = 0;
        if( eOp==SQLITE_INSERT ){
          sqlite3changeset_new(pIter, i, &pVal);
        }else{
          sqlite3changeset_old(pIter, i, &pVal);
        }
        sessionAppendValue(&p->rebase, pVal, &rc);
      }
    }
  }else{
    sessionAppendByte(&p->rebase, SQLITE_INSERT, &rc);
    sessionAppendByte(&p->rebase, eOp==SQLITE_DELETE, &rc);
    for(i=0; i<p->nCol; i++){
      sqlite3_value *pVal = 0;
      if( eOp!=SQLITE_INSERT && p->abPK[i] ){
        sqlite3changeset_old(pIter, i, &pVal);
      }else{
        sqlite3changeset_new(pIter, i, &pVal);
      }
      sessionAppendValue(&p->rebase, pVal, &rc);
    }
  }

  return rc;
}

/*
** Invoke the conflict handler for the change that the changeset iterator
** currently points to.
**
** Argument eType must be either CHANGESET_DATA or CHANGESET_CONFLICT.
** If argument pbReplace is NULL, then the type of conflict handler invoked
................................................................................
  }else if( rc==SQLITE_OK ){
    if( p->bDeferConstraints && eType==SQLITE_CHANGESET_CONFLICT ){
      /* Instead of invoking the conflict handler, append the change blob
      ** to the SessionApplyCtx.constraints buffer. */
      u8 *aBlob = &pIter->in.aData[pIter->in.iCurrent];
      int nBlob = pIter->in.iNext - pIter->in.iCurrent;
      sessionAppendBlob(&p->constraints, aBlob, nBlob, &rc);
      return SQLITE_OK;
    }else{
      /* No other row with the new.* primary key. */
      res = xConflict(pCtx, eType+1, pIter);
      if( res==SQLITE_CHANGESET_REPLACE ) rc = SQLITE_MISUSE;
    }
  }

................................................................................
        rc = SQLITE_ABORT;
        break;

      default:
        rc = SQLITE_MISUSE;
        break;
    }
    if( rc==SQLITE_OK ){
      rc = sessionRebaseAdd(p, res, pIter);
    }
  }

  return rc;
}

/*
** Attempt to apply the change that the iterator passed as the first argument
................................................................................
  void *pCtx                      /* First argument passed to xConflict */
){
  int bReplace = 0;
  int bRetry = 0;
  int rc;

  rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, &bReplace, &bRetry);
  if( rc==SQLITE_OK ){

    /* If the bRetry flag is set, the change has not been applied due to an
    ** SQLITE_CHANGESET_DATA problem (i.e. this is an UPDATE or DELETE and
    ** a row with the correct PK is present in the db, but one or more other
    ** fields do not contain the expected values) and the conflict handler 
    ** returned SQLITE_CHANGESET_REPLACE. In this case retry the operation,
    ** but pass NULL as the final argument so that sessionApplyOneOp() ignores
    ** the SQLITE_CHANGESET_DATA problem.  */
    if( bRetry ){
      assert( pIter->op==SQLITE_UPDATE || pIter->op==SQLITE_DELETE );
      rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, 0, 0);
    }

    /* If the bReplace flag is set, the change is an INSERT that has not
    ** been performed because the database already contains a row with the
    ** specified primary key and the conflict handler returned
    ** SQLITE_CHANGESET_REPLACE. In this case remove the conflicting row
    ** before reattempting the INSERT.  */
    else if( bReplace ){
      assert( pIter->op==SQLITE_INSERT );
      rc = sqlite3_exec(db, "SAVEPOINT replace_op", 0, 0, 0);
      if( rc==SQLITE_OK ){
        rc = sessionBindRow(pIter, 
            sqlite3changeset_new, pApply->nCol, pApply->abPK, pApply->pDelete);
        sqlite3_bind_int(pApply->pDelete, pApply->nCol+1, 1);
      }
      if( rc==SQLITE_OK ){
        sqlite3_step(pApply->pDelete);
        rc = sqlite3_reset(pApply->pDelete);
      }
      if( rc==SQLITE_OK ){
        rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, 0, 0);
      }
      if( rc==SQLITE_OK ){
        rc = sqlite3_exec(db, "RELEASE replace_op", 0, 0, 0);
      }
    }
  }

  return rc;
}

/*
................................................................................
    const char *zTab              /* Table name */
  ),
  int(*xConflict)(
    void *pCtx,                   /* Copy of fifth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx,                     /* First argument passed to xConflict */
  void **ppRebase, int *pnRebase  /* OUT: Rebase information */
){
  int schemaMismatch = 0;
  int rc;                         /* Return code */
  const char *zTab = 0;           /* Name of current table */
  int nTab = 0;                   /* Result of sqlite3Strlen30(zTab) */
  SessionApplyCtx sApply;         /* changeset_apply() context object */
  int bPatchset;
................................................................................
      sqlite3_finalize(sApply.pDelete);
      sqlite3_finalize(sApply.pUpdate); 
      sqlite3_finalize(sApply.pInsert);
      sqlite3_finalize(sApply.pSelect);
      memset(&sApply, 0, sizeof(sApply));
      sApply.db = db;
      sApply.bDeferConstraints = 1;
      sApply.bRebaseStarted = 0;

      /* If an xFilter() callback was specified, invoke it now. If the 
      ** xFilter callback returns zero, skip this table. If it returns
      ** non-zero, proceed. */
      schemaMismatch = (xFilter && (0==xFilter(pCtx, zNew)));
      if( schemaMismatch ){
        zTab = sqlite3_mprintf("%s", zNew);
................................................................................
  if( rc==SQLITE_OK ){
    rc = sqlite3_exec(db, "RELEASE changeset_apply", 0, 0, 0);
  }else{
    sqlite3_exec(db, "ROLLBACK TO changeset_apply", 0, 0, 0);
    sqlite3_exec(db, "RELEASE changeset_apply", 0, 0, 0);
  }

  if( rc==SQLITE_OK && ppRebase && pnRebase ){
    *ppRebase = (void*)sApply.rebase.aBuf;
    *pnRebase = sApply.rebase.nBuf;
    sApply.rebase.aBuf = 0;
  }
  sqlite3_finalize(sApply.pInsert);
  sqlite3_finalize(sApply.pDelete);
  sqlite3_finalize(sApply.pUpdate);
  sqlite3_finalize(sApply.pSelect);
  sqlite3_free((char*)sApply.azCol);  /* cast works around VC++ bug */
  sqlite3_free((char*)sApply.constraints.aBuf);
  sqlite3_free((char*)sApply.rebase.aBuf);
  sqlite3_mutex_leave(sqlite3_db_mutex(db));
  return rc;
}

int sqlite3changeset_apply_v2(
  sqlite3 *db,                    /* Apply change to "main" db of this handle */
  int nChangeset,                 /* Size of changeset in bytes */
  void *pChangeset,               /* Changeset blob */
  int(*xFilter)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    const char *zTab              /* Table name */
  ),
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx,                     /* First argument passed to xConflict */
  void **ppRebase, int *pnRebase
){
  sqlite3_changeset_iter *pIter;  /* Iterator to skip through changeset */  
  int rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
  if( rc==SQLITE_OK ){
    rc = sessionChangesetApply(
        db, pIter, xFilter, xConflict, pCtx, ppRebase, pnRebase
    );
  }
  return rc;
}

/*
** Apply the changeset passed via pChangeset/nChangeset to the main database
** attached to handle "db". Invoke the supplied conflict handler callback
** to resolve any conflicts encountered while applying the change.
*/
int sqlite3changeset_apply(
................................................................................
  int(*xConflict)(
    void *pCtx,                   /* Copy of fifth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
){

  return sqlite3changeset_apply_v2(

      db, nChangeset, pChangeset, xFilter, xConflict, pCtx, 0, 0


  );
}

/*
** Apply the changeset passed via xInput/pIn to the main database
** attached to handle "db". Invoke the supplied conflict handler callback
** to resolve any conflicts encountered while applying the change.
*/
int sqlite3changeset_apply_v2_strm(
  sqlite3 *db,                    /* Apply change to "main" db of this handle */
  int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
  void *pIn,                                          /* First arg for xInput */
  int(*xFilter)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    const char *zTab              /* Table name */
  ),
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx,                     /* First argument passed to xConflict */
  void **ppRebase, int *pnRebase
){
  sqlite3_changeset_iter *pIter;  /* Iterator to skip through changeset */  
  int rc = sqlite3changeset_start_strm(&pIter, xInput, pIn);
  if( rc==SQLITE_OK ){
    rc = sessionChangesetApply(
        db, pIter, xFilter, xConflict, pCtx, ppRebase, pnRebase
    );
  }
  return rc;
}
int sqlite3changeset_apply_strm(
  sqlite3 *db,                    /* Apply change to "main" db of this handle */
  int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
  void *pIn,                                          /* First arg for xInput */
  int(*xFilter)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    const char *zTab              /* Table name */
................................................................................
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
){

  return sqlite3changeset_apply_v2_strm(

      db, xInput, pIn, xFilter, xConflict, pCtx, 0, 0


  );
}

/*
** sqlite3_changegroup handle.
*/
struct sqlite3_changegroup {
  int rc;                         /* Error code */

Changes to ext/session/sqlite3session.h.

1098
1099
1100
1101
1102
1103
1104

















1105
1106
1107
1108
1109
1110
1111
....
1298
1299
1300
1301
1302
1303
1304
















1305
1306
1307
1308
1309
1310
1311
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
);


















/* 
** CAPI3REF: Constants Passed To The Conflict Handler
**
** Values that may be passed as the second argument to a conflict-handler.
**
** <dl>
................................................................................
  ),
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
















);
int sqlite3changeset_concat_strm(
  int (*xInputA)(void *pIn, void *pData, int *pnData),
  void *pInA,
  int (*xInputB)(void *pIn, void *pData, int *pnData),
  void *pInB,
  int (*xOutput)(void *pOut, const void *pData, int nData),







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
....
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
);

int sqlite3changeset_apply_v2(
  sqlite3 *db,                    /* Apply change to "main" db of this handle */
  int nChangeset,                 /* Size of changeset in bytes */
  void *pChangeset,               /* Changeset blob */
  int(*xFilter)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    const char *zTab              /* Table name */
  ),
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx,                     /* First argument passed to xConflict */
  void **ppRebase, int *pnRebase
);

/* 
** CAPI3REF: Constants Passed To The Conflict Handler
**
** Values that may be passed as the second argument to a conflict-handler.
**
** <dl>
................................................................................
  ),
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
);
int sqlite3changeset_apply_v2_strm(
  sqlite3 *db,                    /* Apply change to "main" db of this handle */
  int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
  void *pIn,                                          /* First arg for xInput */
  int(*xFilter)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    const char *zTab              /* Table name */
  ),
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx,                     /* First argument passed to xConflict */
  void **ppRebase, int *pnRebase
);
int sqlite3changeset_concat_strm(
  int (*xInputA)(void *pIn, void *pData, int *pnData),
  void *pInA,
  int (*xInputB)(void *pIn, void *pData, int *pnData),
  void *pInB,
  int (*xOutput)(void *pOut, const void *pData, int nData),

Changes to ext/session/test_session.c.

707
708
709
710
711
712
713
714
715
716
717

718
719
720
721
722
723
724
725
726
727
728
729


730
731
732
733
734
735
736
...
744
745
746
747
748
749
750

751
752
753






754
755
756
757
758
759
760
761
762
763
764
765





766
767























768
769
770
771
772
773
774
....
1025
1026
1027
1028
1029
1030
1031

1032
1033
1034
1035
1036
1037
1038
  }

  *pnData = nRet;
  return SQLITE_OK;
}


/*
** sqlite3changeset_apply DB CHANGESET CONFLICT-SCRIPT ?FILTER-SCRIPT?
*/
static int SQLITE_TCLAPI test_sqlite3changeset_apply(

  void * clientData,
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  sqlite3 *db;                    /* Database handle */
  Tcl_CmdInfo info;               /* Database Tcl command (objv[1]) info */
  int rc;                         /* Return code from changeset_invert() */
  void *pChangeset;               /* Buffer containing changeset */
  int nChangeset;                 /* Size of buffer aChangeset in bytes */
  TestConflictHandler ctx;
  TestStreamInput sStr;



  memset(&sStr, 0, sizeof(sStr));
  sStr.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);

  if( objc!=4 && objc!=5 ){
    Tcl_WrongNumArgs(interp, 1, objv, 
        "DB CHANGESET CONFLICT-SCRIPT ?FILTER-SCRIPT?"
................................................................................
  db = *(sqlite3 **)info.objClientData;
  pChangeset = (void *)Tcl_GetByteArrayFromObj(objv[2], &nChangeset);
  ctx.pConflictScript = objv[3];
  ctx.pFilterScript = objc==5 ? objv[4] : 0;
  ctx.interp = interp;

  if( sStr.nStream==0 ){

    rc = sqlite3changeset_apply(db, nChangeset, pChangeset, 
        (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx
    );






  }else{
    sStr.aData = (unsigned char*)pChangeset;
    sStr.nData = nChangeset;
    rc = sqlite3changeset_apply_strm(db, testStreamInput, (void*)&sStr,
        (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx
    );
  }

  if( rc!=SQLITE_OK ){
    return test_session_error(interp, rc, 0);
  }
  Tcl_ResetResult(interp);





  return TCL_OK;
}
























/*
** sqlite3changeset_apply_replace_all DB CHANGESET 
*/
static int SQLITE_TCLAPI test_sqlite3changeset_apply_replace_all(
  void * clientData,
  Tcl_Interp *interp,
................................................................................
    Tcl_ObjCmdProc *xProc;
  } aCmd[] = {
    { "sqlite3session", test_sqlite3session },
    { "sqlite3session_foreach", test_sqlite3session_foreach },
    { "sqlite3changeset_invert", test_sqlite3changeset_invert },
    { "sqlite3changeset_concat", test_sqlite3changeset_concat },
    { "sqlite3changeset_apply", test_sqlite3changeset_apply },

    { "sqlite3changeset_apply_replace_all", 
      test_sqlite3changeset_apply_replace_all },
    { "sql_exec_changeset", test_sql_exec_changeset },
  };
  int i;

  for(i=0; i<sizeof(aCmd)/sizeof(struct Cmd); i++){







<
<
<
|
>












>
>







 







>
|
|
|
>
>
>
>
>
>










|
|
>
>
>
>
>


>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>







707
708
709
710
711
712
713



714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
...
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
....
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
  }

  *pnData = nRet;
  return SQLITE_OK;
}





static int SQLITE_TCLAPI testSqlite3changesetApply(
  int bV2,
  void * clientData,
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  sqlite3 *db;                    /* Database handle */
  Tcl_CmdInfo info;               /* Database Tcl command (objv[1]) info */
  int rc;                         /* Return code from changeset_invert() */
  void *pChangeset;               /* Buffer containing changeset */
  int nChangeset;                 /* Size of buffer aChangeset in bytes */
  TestConflictHandler ctx;
  TestStreamInput sStr;
  void *pRebase = 0;
  int nRebase = 0;

  memset(&sStr, 0, sizeof(sStr));
  sStr.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);

  if( objc!=4 && objc!=5 ){
    Tcl_WrongNumArgs(interp, 1, objv, 
        "DB CHANGESET CONFLICT-SCRIPT ?FILTER-SCRIPT?"
................................................................................
  db = *(sqlite3 **)info.objClientData;
  pChangeset = (void *)Tcl_GetByteArrayFromObj(objv[2], &nChangeset);
  ctx.pConflictScript = objv[3];
  ctx.pFilterScript = objc==5 ? objv[4] : 0;
  ctx.interp = interp;

  if( sStr.nStream==0 ){
    if( bV2==0 ){
      rc = sqlite3changeset_apply(db, nChangeset, pChangeset, 
          (objc==5)?test_filter_handler:0, test_conflict_handler, (void *)&ctx
      );
    }else{
      rc = sqlite3changeset_apply_v2(db, nChangeset, pChangeset, 
          (objc==5)?test_filter_handler:0, test_conflict_handler, (void *)&ctx,
          &pRebase, &nRebase
      );
    }
  }else{
    sStr.aData = (unsigned char*)pChangeset;
    sStr.nData = nChangeset;
    rc = sqlite3changeset_apply_strm(db, testStreamInput, (void*)&sStr,
        (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx
    );
  }

  if( rc!=SQLITE_OK ){
    return test_session_error(interp, rc, 0);
  }else{
    Tcl_ResetResult(interp);
    if( bV2 && pRebase ){
      Tcl_SetObjResult(interp, Tcl_NewByteArrayObj(pRebase, nRebase));
    }
  }
  sqlite3_free(pRebase);
  return TCL_OK;
}

/*
** sqlite3changeset_apply DB CHANGESET CONFLICT-SCRIPT ?FILTER-SCRIPT?
*/
static int SQLITE_TCLAPI test_sqlite3changeset_apply(
  void * clientData,
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  return testSqlite3changesetApply(0, clientData, interp, objc, objv);
}
/*
** sqlite3changeset_apply_v2 DB CHANGESET CONFLICT-SCRIPT ?FILTER-SCRIPT?
*/
static int SQLITE_TCLAPI test_sqlite3changeset_apply_v2(
  void * clientData,
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  return testSqlite3changesetApply(1, clientData, interp, objc, objv);
}

/*
** sqlite3changeset_apply_replace_all DB CHANGESET 
*/
static int SQLITE_TCLAPI test_sqlite3changeset_apply_replace_all(
  void * clientData,
  Tcl_Interp *interp,
................................................................................
    Tcl_ObjCmdProc *xProc;
  } aCmd[] = {
    { "sqlite3session", test_sqlite3session },
    { "sqlite3session_foreach", test_sqlite3session_foreach },
    { "sqlite3changeset_invert", test_sqlite3changeset_invert },
    { "sqlite3changeset_concat", test_sqlite3changeset_concat },
    { "sqlite3changeset_apply", test_sqlite3changeset_apply },
    { "sqlite3changeset_apply_v2", test_sqlite3changeset_apply_v2 },
    { "sqlite3changeset_apply_replace_all", 
      test_sqlite3changeset_apply_replace_all },
    { "sql_exec_changeset", test_sql_exec_changeset },
  };
  int i;

  for(i=0; i<sizeof(aCmd)/sizeof(struct Cmd); i++){