/ Check-in [88eb6656]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add streaming version of sqlite3changeset_concat().
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | sessions
Files: files | file ages | folders
SHA1: 88eb6656bdb047a104837a2e15e7fe18c0a7a159
User & Date: dan 2014-09-25 20:43:28
Context
2014-09-26
10:52
Fix a problem with concatenating patchsets containing DELETE and INSERT operations on the same row. check-in: 4d8537ea user: dan tags: sessions
2014-09-25
20:43
Add streaming version of sqlite3changeset_concat(). check-in: 88eb6656 user: dan tags: sessions
14:54
Add streaming version of sqlite3changeset_invert() to sessions module. check-in: 8ded6a46 user: dan tags: sessions
Changes
Hide Diffs Unified Diffs Show Whitespace Changes Patch

Changes to ext/session/sessionB.test.

18
19
20
21
22
23
24





25
26
27
28
29
30
31
...
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
} 
source [file join [file dirname [info script]] session_common.tcl]
source $testdir/tester.tcl
ifcapable !session {finish_test; return}

set testprefix sessionB






#
# 1.*: Test that the blobs returned by the session_patchset() API are 
#      as expected. Also the sqlite3_changeset_iter functions.
#
# 2.*: Test that patchset blobs are handled by sqlite3changeset_apply().
#
# 3.*: Test that sqlite3changeset_invert() works with patchset blobs. 
................................................................................
  # operation.
  sqlite3session S db main
  S attach *
  foreach sql $lSql { 
    sqlite3session T db main
    T attach *
    db eval $sql 
    lappend lPatch [T patchset]
    T delete
  }
  set patchset [S patchset]
  S delete

  # Calculate a checksum for the final database.
  set cksum [databasecksum db]

  # 1. Apply the single large patchset to test.db2
  sqlite3 db2 test.db2







>
>
>
>
>







 







|


|







18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
...
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
} 
source [file join [file dirname [info script]] session_common.tcl]
source $testdir/tester.tcl
ifcapable !session {finish_test; return}

set testprefix sessionB

# Fix the bug in concatenating patchsets that contain DELETE ops 
# before re-enabling this.
finish_test
return

#
# 1.*: Test that the blobs returned by the session_patchset() API are 
#      as expected. Also the sqlite3_changeset_iter functions.
#
# 2.*: Test that patchset blobs are handled by sqlite3changeset_apply().
#
# 3.*: Test that sqlite3changeset_invert() works with patchset blobs. 
................................................................................
  # operation.
  sqlite3session S db main
  S attach *
  foreach sql $lSql { 
    sqlite3session T db main
    T attach *
    db eval $sql 
    lappend lPatch [T $tstcmd]
    T delete
  }
  set patchset [S $tstcmd]
  S delete

  # Calculate a checksum for the final database.
  set cksum [databasecksum db]

  # 1. Apply the single large patchset to test.db2
  sqlite3 db2 test.db2

Changes to ext/session/sqlite3session.c.

2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
....
2422
2423
2424
2425
2426
2427
2428












2429


2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
....
2455
2456
2457
2458
2459
2460
2461

2462
2463
2464
2465
2466
2467
2468
....
2623
2624
2625
2626
2627
2628
2629


2630
2631
2632
2633
2634
2635
2636
2637

2638
2639
2640
2641
2642
2643
2644
....
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657
3658

3659
3660
3661
3662
3663
3664
3665
....
3747
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
3764
3765
3766
3767
3768
3769
3770
3771
3772
3773
3774
3775
3776
3777
3778

3779
3780
3781
3782
3783
3784

3785
3786
3787
3788
3789
3790
3791
....
3809
3810
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889

3890
3891

3892

3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911
3912





3913
3914
3915



3916
3917
3918
3919
3920
3921

3922

3923
3924
3925
3926


























































3927
  sqlite3_changeset_iter *p,      /* Changeset iterator */
  u8 **paRec,                     /* If non-NULL, store record pointer here */
  int *pnRec                      /* If non-NULL, store size of record here */
){
  int i;
  u8 op;

  assert( paRec==0 || p->in.xInput==0 ); /* fixme! */
  assert( (paRec==0 && pnRec==0) || (paRec && pnRec) );

  /* If the iterator is in the error-state, return immediately. */
  if( p->rc!=SQLITE_OK ) return p->rc;

  /* Free the current contents of p->apValue[], if any. */
  if( p->apValue ){
................................................................................

  p->op = op;
  p->bIndirect = p->in.aData[p->in.iNext++];
  if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){
    return (p->rc = SQLITE_CORRUPT_BKPT);
  }













  if( paRec ){ *paRec = &p->in.aData[p->in.iNext]; }



  /* If this is an UPDATE or DELETE, read the old.* record. */
  if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
    u8 *abPK = p->bPatchset ? p->abPK : 0;
    p->rc = sessionReadRecord(&p->in, p->nCol, abPK, paRec?0:p->apValue);
    if( p->rc!=SQLITE_OK ) return p->rc;
  }

  /* If this is an INSERT or UPDATE, read the new.* record. */
  if( p->op!=SQLITE_DELETE ){
    sqlite3_value **apOut = (paRec ? 0 : &p->apValue[p->nCol]);
    p->rc = sessionReadRecord(&p->in, p->nCol, 0, apOut);
    if( p->rc!=SQLITE_OK ) return p->rc;
  }

  if( pnRec ){
    *pnRec = (int)(&p->in.aData[p->in.iNext] - *paRec);
  }else if( p->bPatchset && p->op==SQLITE_UPDATE ){
    /* If this is an UPDATE that is part of a patchset, then all PK and
    ** modified fields are present in the new.* record. The old.* record
    ** is currently completely empty. This block shifts the PK fields from
    ** new.* to old.*, to accommodate the code that reads these arrays.  */
    int i;
    for(i=0; i<p->nCol; i++){
      assert( p->apValue[i]==0 );
................................................................................
      assert( p->abPK[i]==0 || p->apValue[i+p->nCol] );
      if( p->abPK[i] ){
        p->apValue[i] = p->apValue[i+p->nCol];
        p->apValue[i+p->nCol] = 0;
      }
    }
  }


  return SQLITE_ROW;
}

/*
** Advance an iterator created by sqlite3changeset_start() to the next
** change in the changeset. This function may return SQLITE_ROW, SQLITE_DONE
................................................................................
/*
** Finalize an iterator allocated with sqlite3changeset_start().
**
** This function may not be called on iterators passed to a conflict handler
** callback by changeset_apply().
*/
int sqlite3changeset_finalize(sqlite3_changeset_iter *p){


  int i;                          /* Used to iterate through p->apValue[] */
  int rc = p->rc;                 /* Return code */
  if( p->apValue ){
    for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]);
  }
  sqlite3_free(p->tblhdr.aBuf);
  sqlite3_free(p->in.buf.aBuf);
  sqlite3_free(p);

  return rc;
}

static int sessionChangesetInvert(
  SessionInput *pInput,           /* Input changeset */
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut,
................................................................................
  u8 *aRec,                       /* Second change record */
  int nRec,                       /* Number of bytes in aRec */
  SessionChange **ppNew           /* OUT: Merged change */
){
  SessionChange *pNew = 0;

  if( !pExist ){
    pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange));
    if( !pNew ){
      return SQLITE_NOMEM;
    }
    memset(pNew, 0, sizeof(SessionChange));
    pNew->op = op2;
    pNew->bIndirect = bIndirect;
    pNew->nRecord = nRec;
    pNew->aRecord = aRec;

  }else{
    int op1 = pExist->op;

    /* 
    **   op1=INSERT, op2=INSERT      ->      Unsupported. Discard op2.
    **   op1=INSERT, op2=UPDATE      ->      INSERT.
    **   op1=INSERT, op2=DELETE      ->      (none)
................................................................................
  return SQLITE_OK;
}

/*
** Add all changes in the changeset passed via the first two arguments to
** hash tables.
*/
static int sessionConcatChangeset(
  int bPatchset,                  /* True to expect patchsets */
  int nChangeset,                 /* Number of bytes in pChangeset */
  void *pChangeset,               /* Changeset buffer */
  SessionTable **ppTabList        /* IN/OUT: List of table objects */
){
  u8 *aRec;
  int nRec;
  sqlite3_changeset_iter *pIter;
  int rc;
  SessionTable *pTab = 0;

  rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
  if( rc!=SQLITE_OK ) return rc;

  while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec) ){
    const char *zNew;
    int nCol;
    int op;
    int iHash;
    int bIndirect;
    SessionChange *pChange;
    SessionChange *pExist = 0;
    SessionChange **pp;


    assert( bPatchset==0 || bPatchset==1 );
    assert( pIter->bPatchset==0 || pIter->bPatchset==1 );
    if( pIter->bPatchset!=bPatchset ){
      rc = SQLITE_ERROR;
      break;
    }


    sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect);
    if( !pTab || sqlite3_stricmp(zNew, pTab->zName) ){
      /* Search the list for a matching table */
      int nNew = (int)strlen(zNew);
      u8 *abPK;

................................................................................
        *ppTabList = pTab;
      }else if( pTab->nCol!=nCol || memcmp(pTab->abPK, abPK, nCol) ){
        rc = SQLITE_SCHEMA;
        break;
      }
    }

    if( sessionGrowHash(bPatchset, pTab) ){
      rc = SQLITE_NOMEM;
      break;
    }
    iHash = sessionChangeHash(
        pTab, (bPatchset && op==SQLITE_DELETE), aRec, pTab->nChange
    );

    /* Search for existing entry. If found, remove it from the hash table. 
    ** Code below may link it back in.
    */
    for(pp=&pTab->apChange[iHash]; *pp; pp=&(*pp)->pNext){
      int bPkOnly1 = 0;
      int bPkOnly2 = 0;
      if( bPatchset ){
        bPkOnly1 = (*pp)->op==SQLITE_DELETE;
        bPkOnly2 = op==SQLITE_DELETE;
      }
      if( sessionChangeEqual(pTab, bPkOnly1, (*pp)->aRecord, bPkOnly2, aRec) ){
        pExist = *pp;
        *pp = (*pp)->pNext;
        pTab->nEntry--;
        break;
      }
    }

    rc = sessionChangeMerge(pTab, 
        bPatchset, pExist, op, bIndirect, aRec, nRec, &pChange
    );
    if( rc ) break;
    if( pChange ){
      pChange->pNext = pTab->apChange[iHash];
      pTab->apChange[iHash] = pChange;
      pTab->nEntry++;
    }
  }

  if( rc==SQLITE_OK ){
    rc = sqlite3changeset_finalize(pIter);
  }else{
    sqlite3changeset_finalize(pIter);
  }
  return rc;
}
  

/* 
** 1. Iterate through the left-hand changeset. Add an entry to a table
**    specific hash table for each change in the changeset. The hash table
**    key is the PK of the row affected by the change.
**
** 2. Then interate through the right-hand changeset. Attempt to add an 
**    entry to a hash table for each component change. If a change already 
**    exists with the same PK values, combine the two into a single change.
**
** 3. Write an output changeset based on the contents of the hash table.
*/
int sqlite3changeset_concat(
  int nLeft,                      /* Number of bytes in lhs input */
  void *pLeft,                    /* Lhs input changeset */
  int nRight                      /* Number of bytes in rhs input */,
  void *pRight,                   /* Rhs input changeset */
  int *pnOut,                     /* OUT: Number of bytes in output changeset */
  void **ppOut                    /* OUT: changeset (left <concat> right) */
){
  SessionTable *pList = 0;        /* List of SessionTable objects */
  int rc;                         /* Return code */
  int bPatch;                     /* True for a patchset */

  *pnOut = 0;
  *ppOut = 0;
  bPatch = (nLeft>0 && *(char*)pLeft=='P') || (nRight>0 && *(char*)pRight=='P');

  rc = sessionConcatChangeset(bPatch, nLeft, pLeft, &pList);

  if( rc==SQLITE_OK ){
    rc = sessionConcatChangeset(bPatch, nRight, pRight, &pList);

  }


  /* Create the serialized output changeset based on the contents of the
  ** hash tables attached to the SessionTable objects in list pList. 
  */
  if( rc==SQLITE_OK ){
    SessionTable *pTab;
    SessionBuffer buf = {0, 0, 0};
    for(pTab=pList; pTab; pTab=pTab->pNext){
      int i;
      if( pTab->nEntry==0 ) continue;

      sessionAppendTableHdr(&buf, bPatch, pTab, &rc);
      for(i=0; i<pTab->nChange; i++){
        SessionChange *p;
        for(p=pTab->apChange[i]; p; p=p->pNext){
          sessionAppendByte(&buf, p->op, &rc);
          sessionAppendByte(&buf, p->bIndirect, &rc);
          sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc);
        }
      }





    }

    if( rc==SQLITE_OK ){



      *ppOut = buf.aBuf;
      *pnOut = buf.nBuf;
    }else{
      sqlite3_free(buf.aBuf);
    }
  }



  sessionDeleteTable(pList);
  return rc;
}



























































#endif /* SQLITE_ENABLE_SESSION && SQLITE_ENABLE_PREUPDATE_HOOK */







<







 







>
>
>
>
>
>
>
>
>
>
>
>
|
>
>




|





<
|



<
<
|







 







>







 







>
>
|
|






>







 







|







|
>







 







|
|
<
<




<
|


<
<
<










>






>







 







|




|








|












|









|
<
<
<
<



<











|
|
|
|
|
|
|





|
<
<

<
>

<
>

>







|












>
>
>
>
>



>
>
>
|
|
<
|
|
|
>
|
>




>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

2382
2383
2384
2385
2386
2387
2388

2389
2390
2391
2392
2393
2394
2395
....
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452

2453
2454
2455
2456


2457
2458
2459
2460
2461
2462
2463
2464
....
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
....
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
....
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
....
3762
3763
3764
3765
3766
3767
3768
3769
3770


3771
3772
3773
3774

3775
3776
3777



3778
3779
3780
3781
3782
3783
3784
3785
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795
3796
3797
3798
3799
3800
3801
3802
....
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864




3865
3866
3867

3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
3890
3891


3892

3893
3894

3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911
3912
3913
3914
3915
3916
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927
3928
3929
3930

3931
3932
3933
3934
3935
3936
3937
3938
3939
3940
3941
3942
3943
3944
3945
3946
3947
3948
3949
3950
3951
3952
3953
3954
3955
3956
3957
3958
3959
3960
3961
3962
3963
3964
3965
3966
3967
3968
3969
3970
3971
3972
3973
3974
3975
3976
3977
3978
3979
3980
3981
3982
3983
3984
3985
3986
3987
3988
3989
3990
3991
3992
3993
3994
3995
3996
3997
3998
3999
  sqlite3_changeset_iter *p,      /* Changeset iterator */
  u8 **paRec,                     /* If non-NULL, store record pointer here */
  int *pnRec                      /* If non-NULL, store size of record here */
){
  int i;
  u8 op;


  assert( (paRec==0 && pnRec==0) || (paRec && pnRec) );

  /* If the iterator is in the error-state, return immediately. */
  if( p->rc!=SQLITE_OK ) return p->rc;

  /* Free the current contents of p->apValue[], if any. */
  if( p->apValue ){
................................................................................

  p->op = op;
  p->bIndirect = p->in.aData[p->in.iNext++];
  if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){
    return (p->rc = SQLITE_CORRUPT_BKPT);
  }

  if( paRec ){ 
    int nVal;                     /* Number of values to buffer */
    if( p->bPatchset==0 && op==SQLITE_UPDATE ){
      nVal = p->nCol * 2;
    }else if( p->bPatchset && op==SQLITE_DELETE ){
      nVal = 0;
      for(i=0; i<p->nCol; i++) if( p->abPK[i] ) nVal++;
    }else{
      nVal = p->nCol;
    }
    p->rc = sessionChangesetBufferRecord(&p->in, nVal, pnRec);
    if( p->rc!=SQLITE_OK ) return p->rc;
    *paRec = &p->in.aData[p->in.iNext];
    p->in.iNext += *pnRec;
  }else{

    /* If this is an UPDATE or DELETE, read the old.* record. */
    if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
      u8 *abPK = p->bPatchset ? p->abPK : 0;
      p->rc = sessionReadRecord(&p->in, p->nCol, abPK, p->apValue);
      if( p->rc!=SQLITE_OK ) return p->rc;
    }

    /* If this is an INSERT or UPDATE, read the new.* record. */
    if( p->op!=SQLITE_DELETE ){

      p->rc = sessionReadRecord(&p->in, p->nCol, 0, &p->apValue[p->nCol]);
      if( p->rc!=SQLITE_OK ) return p->rc;
    }



    if( p->bPatchset && p->op==SQLITE_UPDATE ){
      /* If this is an UPDATE that is part of a patchset, then all PK and
      ** modified fields are present in the new.* record. The old.* record
      ** is currently completely empty. This block shifts the PK fields from
      ** new.* to old.*, to accommodate the code that reads these arrays.  */
      int i;
      for(i=0; i<p->nCol; i++){
        assert( p->apValue[i]==0 );
................................................................................
        assert( p->abPK[i]==0 || p->apValue[i+p->nCol] );
        if( p->abPK[i] ){
          p->apValue[i] = p->apValue[i+p->nCol];
          p->apValue[i+p->nCol] = 0;
        }
      }
    }
  }

  return SQLITE_ROW;
}

/*
** Advance an iterator created by sqlite3changeset_start() to the next
** change in the changeset. This function may return SQLITE_ROW, SQLITE_DONE
................................................................................
/*
** Finalize an iterator allocated with sqlite3changeset_start().
**
** This function may not be called on iterators passed to a conflict handler
** callback by changeset_apply().
*/
int sqlite3changeset_finalize(sqlite3_changeset_iter *p){
  int rc = SQLITE_OK;
  if( p ){
    int i;                        /* Used to iterate through p->apValue[] */
    rc = p->rc;
    if( p->apValue ){
      for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]);
    }
    sqlite3_free(p->tblhdr.aBuf);
    sqlite3_free(p->in.buf.aBuf);
    sqlite3_free(p);
  }
  return rc;
}

static int sessionChangesetInvert(
  SessionInput *pInput,           /* Input changeset */
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut,
................................................................................
  u8 *aRec,                       /* Second change record */
  int nRec,                       /* Number of bytes in aRec */
  SessionChange **ppNew           /* OUT: Merged change */
){
  SessionChange *pNew = 0;

  if( !pExist ){
    pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange) + nRec);
    if( !pNew ){
      return SQLITE_NOMEM;
    }
    memset(pNew, 0, sizeof(SessionChange));
    pNew->op = op2;
    pNew->bIndirect = bIndirect;
    pNew->nRecord = nRec;
    pNew->aRecord = (u8*)&pNew[1];
    memcpy(pNew->aRecord, aRec, nRec);
  }else{
    int op1 = pExist->op;

    /* 
    **   op1=INSERT, op2=INSERT      ->      Unsupported. Discard op2.
    **   op1=INSERT, op2=UPDATE      ->      INSERT.
    **   op1=INSERT, op2=DELETE      ->      (none)
................................................................................
  return SQLITE_OK;
}

/*
** Add all changes in the changeset passed via the first two arguments to
** hash tables.
*/
static int sessionAddChangeset(
  sqlite3_changeset_iter *pIter,   /* Iterator to read from */


  SessionTable **ppTabList        /* IN/OUT: List of table objects */
){
  u8 *aRec;
  int nRec;

  int rc = SQLITE_OK;
  SessionTable *pTab = 0;




  while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec) ){
    const char *zNew;
    int nCol;
    int op;
    int iHash;
    int bIndirect;
    SessionChange *pChange;
    SessionChange *pExist = 0;
    SessionChange **pp;

#if 0
    assert( bPatchset==0 || bPatchset==1 );
    assert( pIter->bPatchset==0 || pIter->bPatchset==1 );
    if( pIter->bPatchset!=bPatchset ){
      rc = SQLITE_ERROR;
      break;
    }
#endif

    sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect);
    if( !pTab || sqlite3_stricmp(zNew, pTab->zName) ){
      /* Search the list for a matching table */
      int nNew = (int)strlen(zNew);
      u8 *abPK;

................................................................................
        *ppTabList = pTab;
      }else if( pTab->nCol!=nCol || memcmp(pTab->abPK, abPK, nCol) ){
        rc = SQLITE_SCHEMA;
        break;
      }
    }

    if( sessionGrowHash(pIter->bPatchset, pTab) ){
      rc = SQLITE_NOMEM;
      break;
    }
    iHash = sessionChangeHash(
        pTab, (pIter->bPatchset && op==SQLITE_DELETE), aRec, pTab->nChange
    );

    /* Search for existing entry. If found, remove it from the hash table. 
    ** Code below may link it back in.
    */
    for(pp=&pTab->apChange[iHash]; *pp; pp=&(*pp)->pNext){
      int bPkOnly1 = 0;
      int bPkOnly2 = 0;
      if( pIter->bPatchset ){
        bPkOnly1 = (*pp)->op==SQLITE_DELETE;
        bPkOnly2 = op==SQLITE_DELETE;
      }
      if( sessionChangeEqual(pTab, bPkOnly1, (*pp)->aRecord, bPkOnly2, aRec) ){
        pExist = *pp;
        *pp = (*pp)->pNext;
        pTab->nEntry--;
        break;
      }
    }

    rc = sessionChangeMerge(pTab, 
        pIter->bPatchset, pExist, op, bIndirect, aRec, nRec, &pChange
    );
    if( rc ) break;
    if( pChange ){
      pChange->pNext = pTab->apChange[iHash];
      pTab->apChange[iHash] = pChange;
      pTab->nEntry++;
    }
  }

  if( rc==SQLITE_OK ) rc = pIter->rc;




  return rc;
}
  

/* 
** 1. Iterate through the left-hand changeset. Add an entry to a table
**    specific hash table for each change in the changeset. The hash table
**    key is the PK of the row affected by the change.
**
** 2. Then interate through the right-hand changeset. Attempt to add an 
**    entry to a hash table for each component change. If a change already 
**    exists with the same PK values, combine the two into a single change.
**
** 3. Write an output changeset based on the contents of the hash table.
*/
int sessionChangesetConcat(
  sqlite3_changeset_iter *pLeft,
  sqlite3_changeset_iter *pRight,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut,
  int *pnOut,
  void **ppOut
){
  SessionTable *pList = 0;        /* List of SessionTable objects */
  int rc;                         /* Return code */
  int bPatch;                     /* True for a patchset */

  assert( xOutput==0 || (ppOut==0 && pnOut==0) );




  rc = sessionAddChangeset(pLeft, &pList);
  if( rc==SQLITE_OK ){

    rc = sessionAddChangeset(pRight, &pList);
  }
  bPatch = pLeft->bPatchset || pRight->bPatchset;

  /* Create the serialized output changeset based on the contents of the
  ** hash tables attached to the SessionTable objects in list pList. 
  */
  if( rc==SQLITE_OK ){
    SessionTable *pTab;
    SessionBuffer buf = {0, 0, 0};
    for(pTab=pList; pTab && rc==SQLITE_OK; pTab=pTab->pNext){
      int i;
      if( pTab->nEntry==0 ) continue;

      sessionAppendTableHdr(&buf, bPatch, pTab, &rc);
      for(i=0; i<pTab->nChange; i++){
        SessionChange *p;
        for(p=pTab->apChange[i]; p; p=p->pNext){
          sessionAppendByte(&buf, p->op, &rc);
          sessionAppendByte(&buf, p->bIndirect, &rc);
          sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc);
        }
      }

      if( rc==SQLITE_OK && xOutput && buf.nBuf>=SESSIONS_STR_CHUNK_SIZE ){
        rc = xOutput(pOut, buf.aBuf, buf.nBuf);
        buf.nBuf = 0;
      }
    }

    if( rc==SQLITE_OK ){
      if( xOutput ){
        if( buf.nBuf>0 ) rc = xOutput(pOut, buf.aBuf, buf.nBuf);
      }else{
        *ppOut = buf.aBuf;
        *pnOut = buf.nBuf;

        buf.aBuf = 0;
      }
    }
    sqlite3_free(buf.aBuf);
  }

  sessionDeleteTable(pList);
  return rc;
}

/* 
** Combine two changesets together.
*/
int sqlite3changeset_concat(
  int nLeft,                      /* Number of bytes in lhs input */
  void *pLeft,                    /* Lhs input changeset */
  int nRight                      /* Number of bytes in rhs input */,
  void *pRight,                   /* Rhs input changeset */
  int *pnOut,                     /* OUT: Number of bytes in output changeset */
  void **ppOut                    /* OUT: changeset (left <concat> right) */
){
  sqlite3_changeset_iter *pIter1 = 0;
  sqlite3_changeset_iter *pIter2 = 0;
  int rc;

  *pnOut = 0;
  *ppOut = 0;
  rc = sqlite3changeset_start(&pIter1, nLeft, pLeft);
  if( rc==SQLITE_OK ){
    rc = sqlite3changeset_start(&pIter2, nRight, pRight);
  }
  if( rc==SQLITE_OK ){
    rc = sessionChangesetConcat(pIter1, pIter2, 0, 0, pnOut, ppOut);
  }

  sqlite3changeset_finalize(pIter1);
  sqlite3changeset_finalize(pIter2);
  return rc;
}

/*
** Streaming version of sqlite3changeset_concat().
*/
int sqlite3changeset_concat_str(
  int (*xInputA)(void *pIn, void *pData, int *pnData),
  void *pInA,
  int (*xInputB)(void *pIn, void *pData, int *pnData),
  void *pInB,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
){
  sqlite3_changeset_iter *pIter1 = 0;
  sqlite3_changeset_iter *pIter2 = 0;
  int rc;

  rc = sqlite3changeset_start_str(&pIter1, xInputA, pInA);
  if( rc==SQLITE_OK ){
    rc = sqlite3changeset_start_str(&pIter2, xInputB, pInB);
  }
  if( rc==SQLITE_OK ){
    rc = sessionChangesetConcat(pIter1, pIter2, xOutput, pOut, 0, 0);
  }

  sqlite3changeset_finalize(pIter1);
  sqlite3changeset_finalize(pIter2);
  return rc;
}

#endif /* SQLITE_ENABLE_SESSION && SQLITE_ENABLE_PREUPDATE_HOOK */

Changes to ext/session/sqlite3session.h.

756
757
758
759
760
761
762












763
764
765
766
767
768
769
  int nA,                         /* Number of bytes in buffer pA */
  void *pA,                       /* Pointer to buffer containing changeset A */
  int nB,                         /* Number of bytes in buffer pB */
  void *pB,                       /* Pointer to buffer containing changeset B */
  int *pnOut,                     /* OUT: Number of bytes in output changeset */
  void **ppOut                    /* OUT: Buffer containing output changeset */
);













/*
** CAPI3REF: Apply A Changeset To A Database
**
** Apply a changeset to a database. This function attempts to update the
** "main" database attached to handle db with the changes found in the
** changeset passed via the second and third arguments.







>
>
>
>
>
>
>
>
>
>
>
>







756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
  int nA,                         /* Number of bytes in buffer pA */
  void *pA,                       /* Pointer to buffer containing changeset A */
  int nB,                         /* Number of bytes in buffer pB */
  void *pB,                       /* Pointer to buffer containing changeset B */
  int *pnOut,                     /* OUT: Number of bytes in output changeset */
  void **ppOut                    /* OUT: Buffer containing output changeset */
);

/*
** Streaming verson of sqlite3changeset_concat().
*/
int sqlite3changeset_concat_str(
  int (*xInputA)(void *pIn, void *pData, int *pnData),
  void *pInA,
  int (*xInputB)(void *pIn, void *pData, int *pnData),
  void *pInB,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
);

/*
** CAPI3REF: Apply A Changeset To A Database
**
** Apply a changeset to a database. This function attempts to update the
** "main" database attached to handle db with the changes found in the
** changeset passed via the second and third arguments.

Changes to ext/session/test_session.c.

711
712
713
714
715
716
717
718

719
720
721
722
723
724
725
726
727
728



729
730


731







732




733
734


735
736
737
738
739
740
741
742
743
744
745
static int test_sqlite3changeset_concat(
  void * clientData,
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  int rc;                         /* Return code from changeset_invert() */
  void *aLeft;                    /* Input changeset */

  int nLeft;                      /* Size of buffer aChangeset in bytes */
  void *aRight;                   /* Input changeset */
  int nRight;                     /* Size of buffer aChangeset in bytes */
  void *aOut;                     /* Output changeset */
  int nOut;                       /* Size of buffer aOut in bytes */

  if( objc!=3 ){
    Tcl_WrongNumArgs(interp, 1, objv, "LEFT RIGHT");
    return TCL_ERROR;
  }



  aLeft = (void *)Tcl_GetByteArrayFromObj(objv[1], &nLeft);
  aRight = (void *)Tcl_GetByteArrayFromObj(objv[2], &nRight);










  rc = sqlite3changeset_concat(nLeft, aLeft, nRight, aRight, &nOut, &aOut);




  if( rc!=SQLITE_OK ){
    return test_session_error(interp, rc);


  }
  Tcl_SetObjResult(interp, Tcl_NewByteArrayObj((unsigned char *)aOut, nOut));
  sqlite3_free(aOut);
  return TCL_OK;
}

/*
** sqlite3session_foreach VARNAME CHANGESET SCRIPT
*/
static int test_sqlite3session_foreach(
  void * clientData,







<
>
|
|
|
<
<





>
>
>
|
|
>
>

>
>
>
>
>
>
>
|
>
>
>
>

|
>
>

<
|
|







711
712
713
714
715
716
717

718
719
720
721


722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751

752
753
754
755
756
757
758
759
760
static int test_sqlite3changeset_concat(
  void * clientData,
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  int rc;                         /* Return code from changeset_invert() */


  TestStreamInput sLeft;          /* Input stream */
  TestStreamInput sRight;         /* Input stream */
  TestSessionsBlob sOut = {0,0};  /* Output blob */



  if( objc!=3 ){
    Tcl_WrongNumArgs(interp, 1, objv, "LEFT RIGHT");
    return TCL_ERROR;
  }

  memset(&sLeft, 0, sizeof(sLeft));
  memset(&sRight, 0, sizeof(sRight));
  sLeft.aData = Tcl_GetByteArrayFromObj(objv[1], &sLeft.nData);
  sRight.aData = Tcl_GetByteArrayFromObj(objv[2], &sRight.nData);
  sLeft.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);
  sRight.nStream = sLeft.nStream;

  if( sLeft.nStream>0 ){
    rc = sqlite3changeset_concat_str(
        testStreamInput, (void*)&sLeft,
        testStreamInput, (void*)&sRight,
        testSessionsOutput, (void*)&sOut
    );
  }else{
    rc = sqlite3changeset_concat(
        sLeft.nData, sLeft.aData, sRight.nData, sRight.aData, &sOut.n, &sOut.p
    );
  }

  if( rc!=SQLITE_OK ){
    rc = test_session_error(interp, rc);
  }else{
    Tcl_SetObjResult(interp,Tcl_NewByteArrayObj((unsigned char*)sOut.p,sOut.n));
  }

  sqlite3_free(sOut.p);
  return rc;
}

/*
** sqlite3session_foreach VARNAME CHANGESET SCRIPT
*/
static int test_sqlite3session_foreach(
  void * clientData,