/ Check-in [1f44bfdc]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Change the names of the stream interface APIs to be of the form "_strm" instead of "_str". In other words, added an "m" to the end, to try to make it clear that we are talking about a "stream" and not a "string.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | sessions
Files: files | file ages | folders
SHA1:1f44bfdc237ee6304f4aa56e5c5d1c0d74fcc944
User & Date: drh 2014-09-27 20:45:48
Context
2014-10-01
01:52
Merge the latest enhancements from trunk. check-in: 2695772c user: drh tags: sessions
2014-09-27
20:45
Change the names of the stream interface APIs to be of the form "_strm" instead of "_str". In other words, added an "m" to the end, to try to make it clear that we are talking about a "stream" and not a "string. check-in: 1f44bfdc user: drh tags: sessions
19:51
Merge recent trunk changes (performance enhancements) into the sessions branch. check-in: 497367cb user: drh tags: sessions
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to ext/session/session5.test.

402
403
404
405
406
407
408
409
} {
  DELETE FROM t1 WHERE a = 'key';
} {
  {DELETE t1 0 X. {t key t valueX} {}}
}

finish_test








<
402
403
404
405
406
407
408

} {
  DELETE FROM t1 WHERE a = 'key';
} {
  {DELETE t1 0 X. {t key t valueX} {}}
}

finish_test

Changes to ext/session/session8.test.

85
86
87
88
89
90
91
92
  INSERT INTO t3 VALUES(0.0, 'abcdef', 12, 'x', 45);
}

do_then_undo 3.2 { UPDATE t3 SET b=b||b WHERE e!='x' }
do_then_undo 3.3 { UPDATE t3 SET a = 46 }

finish_test








<
85
86
87
88
89
90
91

  INSERT INTO t3 VALUES(0.0, 'abcdef', 12, 'x', 45);
}

do_then_undo 3.2 { UPDATE t3 SET b=b||b WHERE e!='x' }
do_then_undo 3.3 { UPDATE t3 SET a = 46 }

finish_test

Changes to ext/session/session9.test.

281
282
283
284
285
286
287
288
} {1 SQLITE_ABORT}

do_execsql_test 5.4 {
  SELECT * FROM X1;
} {1 1}

finish_test








<
281
282
283
284
285
286
287

} {1 SQLITE_ABORT}

do_execsql_test 5.4 {
  SELECT * FROM X1;
} {1 1}

finish_test

Changes to ext/session/sessionA.test.

100
101
102
103
104
105
106
107
108
  execsql {
    SELECT * FROM t1;
    SELECT * FROM t2;
  }
} {x y}

finish_test









<
<
100
101
102
103
104
105
106


  execsql {
    SELECT * FROM t1;
    SELECT * FROM t2;
  }
} {x y}

finish_test


Changes to ext/session/sessionB.test.

512
513
514
515
516
517
518
519
  $initsql $insert $delete     \
  $insert $delete              \
  "$insert $delete"            \
  $delete


finish_test








<
512
513
514
515
516
517
518

  $initsql $insert $delete     \
  $insert $delete              \
  "$insert $delete"            \
  $delete


finish_test

Changes to ext/session/session_common.tcl.

129
130
131
132
133
134
135
136
}

proc changeset_to_list {c} {
  set list [list]
  sqlite3session_foreach elem $c { lappend list $elem }
  lsort $list
}








<
129
130
131
132
133
134
135

}

proc changeset_to_list {c} {
  set list [list]
  sqlite3session_foreach elem $c { lappend list $elem }
  lsort $list
}

Changes to ext/session/sqlite3session.c.

13
14
15
16
17
18
19

20
21
22
23

24
25
26
27
28
29
30
..
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
....
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
....
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
....
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
....
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
....
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
....
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
....
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
....
3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
....
3653
3654
3655
3656
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
....
3928
3929
3930
3931
3932
3933
3934
3935
3936
3937
3938
3939
3940
3941
3942
....
3982
3983
3984
3985
3986
3987
3988
3989
3990
3991
3992
3993
3994
3995
3996
3997
3998
3999
4000
4001
4002
4003
4004
4005
4006
4007
4008
4009
4010
4011
4012
4013
4014
typedef struct SessionChange SessionChange;
typedef struct SessionBuffer SessionBuffer;
typedef struct SessionInput SessionInput;

/*
** Minimum chunk size used by streaming versions of functions.
*/

#ifdef SQLITE_TEST
#define SESSIONS_STR_CHUNK_SIZE 64
#else
#define SESSIONS_STR_CHUNK_SIZE 1024

#endif

/*
** Session handle structure.
*/
struct sqlite3_session {
  sqlite3 *db;                    /* Database handle session is attached to */
................................................................................
  int nAlloc;                     /* Size of allocation containing aBuf */
};

/*
** An object of this type is used internally as an abstraction for 
** input data. Input data may be supplied either as a single large buffer
** (e.g. sqlite3changeset_start()) or using a stream function (e.g.
**  sqlite3changeset_start_str()).
*/
struct SessionInput {
  int iNext;                      /* Offset in aData[] of next change */
  u8 *aData;                      /* Pointer to buffer containing changeset */
  int nData;                      /* Number of bytes in aData */

  SessionBuffer buf;              /* Current read buffer */
................................................................................
          }else if( p->op!=SQLITE_INSERT ){
            rc = sessionAppendDelete(&buf, bPatchset, p, nCol, abPK);
          }
          if( rc==SQLITE_OK ){
            rc = sqlite3_reset(pSel);
          }

          /* If the buffer is now larger than SESSIONS_STR_CHUNK_SIZE, pass
          ** its contents to the xOutput() callback. */
          if( xOutput 
           && rc==SQLITE_OK 
           && buf.nBuf>nNoop 
           && buf.nBuf>SESSIONS_STR_CHUNK_SIZE 
          ){
            rc = xOutput(pOut, (void*)buf.aBuf, buf.nBuf);
            nNoop = -1;
            buf.nBuf = 0;
          }

        }
................................................................................
){
  return sessionGenerateChangeset(pSession, 0, 0, 0, pnChangeset, ppChangeset);
}

/*
** Streaming version of sqlite3session_changeset().
*/
int sqlite3session_changeset_str(
  sqlite3_session *pSession,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
){
  return sessionGenerateChangeset(pSession, 0, xOutput, pOut, 0, 0);
}

/*
** Streaming version of sqlite3session_patchset().
*/
int sqlite3session_patchset_str(
  sqlite3_session *pSession,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
){
  return sessionGenerateChangeset(pSession, 1, xOutput, pOut, 0, 0);
}

................................................................................
  }
  sqlite3_mutex_leave(sqlite3_db_mutex(pSession->db));

  return (ret==0);
}

/*
** Do the work for either sqlite3changeset_start() or start_str().
*/
int sessionChangesetStart(
  sqlite3_changeset_iter **pp,    /* OUT: Changeset iterator handle */
  int (*xInput)(void *pIn, void *pData, int *pnData),
  void *pIn,
  int nChangeset,                 /* Size of buffer pChangeset in bytes */
  void *pChangeset                /* Pointer to buffer containing changeset */
................................................................................
){
  return sessionChangesetStart(pp, 0, 0, nChangeset, pChangeset);
}

/*
** Streaming version of sqlite3changeset_start().
*/
int sqlite3changeset_start_str(
  sqlite3_changeset_iter **pp,    /* OUT: Changeset iterator handle */
  int (*xInput)(void *pIn, void *pData, int *pnData),
  void *pIn
){
  return sessionChangesetStart(pp, xInput, pIn, 0, 0);
}

................................................................................
**
** Return an SQLite error code if an error occurs, or SQLITE_OK otherwise.
*/
static int sessionInputBuffer(SessionInput *pIn, int nByte){
  int rc = SQLITE_OK;
  if( pIn->xInput ){
    while( !pIn->bEof && (pIn->iNext+nByte)>=pIn->nData && rc==SQLITE_OK ){
      int nNew = SESSIONS_STR_CHUNK_SIZE;

      if( pIn->iNext>=SESSIONS_STR_CHUNK_SIZE ){
        int nMove = pIn->buf.nBuf - pIn->iNext;
        memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove);
        pIn->buf.nBuf -= pIn->iNext;
        pIn->iNext = 0;
      }

      if( SQLITE_OK==sessionBufferGrow(&pIn->buf, nNew, &rc) ){
................................................................................

      default:
        rc = SQLITE_CORRUPT_BKPT;
        goto finished_invert;
    }

    assert( rc==SQLITE_OK );
    if( xOutput && sOut.nBuf>=SESSIONS_STR_CHUNK_SIZE ){
      rc = xOutput(pOut, sOut.aBuf, sOut.nBuf);
      sOut.nBuf = 0;
      if( rc!=SQLITE_OK ) goto finished_invert;
    }
  }

  assert( rc==SQLITE_OK );
................................................................................

  return sessionChangesetInvert(&sInput, 0, 0, pnInverted, ppInverted);
}

/*
** Streaming version of sqlite3changeset_invert().
*/
int sqlite3changeset_invert_str(
  int (*xInput)(void *pIn, void *pData, int *pnData),
  void *pIn,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
){
  SessionInput sInput;
  int rc;
................................................................................
}

/*
** Apply the changeset passed via xInput/pIn to the main database
** attached to handle "db". Invoke the supplied conflict handler callback
** to resolve any conflicts encountered while applying the change.
*/
int sqlite3changeset_apply_str(
  sqlite3 *db,                    /* Apply change to "main" db of this handle */
  int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
  void *pIn,                                          /* First arg for xInput */
  int(*xFilter)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    const char *zTab              /* Table name */
  ),
................................................................................
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
){
  sqlite3_changeset_iter *pIter;  /* Iterator to skip through changeset */  
  int rc = sqlite3changeset_start_str(&pIter, xInput, pIn);
  if( rc==SQLITE_OK ){
    rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx);
  }
  return rc;
}

/*
................................................................................
      for(p=pTab->apChange[i]; p; p=p->pNext){
        sessionAppendByte(&buf, p->op, &rc);
        sessionAppendByte(&buf, p->bIndirect, &rc);
        sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc);
      }
    }

    if( rc==SQLITE_OK && xOutput && buf.nBuf>=SESSIONS_STR_CHUNK_SIZE ){
      rc = xOutput(pOut, buf.aBuf, buf.nBuf);
      buf.nBuf = 0;
    }
  }

  if( rc==SQLITE_OK ){
    if( xOutput ){
................................................................................
  sqlite3changeset_finalize(pIter2);
  return rc;
}

/*
** Streaming version of sqlite3changeset_concat().
*/
int sqlite3changeset_concat_str(
  int (*xInputA)(void *pIn, void *pData, int *pnData),
  void *pInA,
  int (*xInputB)(void *pIn, void *pData, int *pnData),
  void *pInB,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
){
  sqlite3_changeset_iter *pIter1 = 0;
  sqlite3_changeset_iter *pIter2 = 0;
  int rc;

  rc = sqlite3changeset_start_str(&pIter1, xInputA, pInA);
  if( rc==SQLITE_OK ){
    rc = sqlite3changeset_start_str(&pIter2, xInputB, pInB);
  }
  if( rc==SQLITE_OK ){
    rc = sessionChangesetConcat(pIter1, pIter2, xOutput, pOut, 0, 0);
  }

  sqlite3changeset_finalize(pIter1);
  sqlite3changeset_finalize(pIter2);
  return rc;
}

#endif /* SQLITE_ENABLE_SESSION && SQLITE_ENABLE_PREUPDATE_HOOK */







>
|
|
|
|
>







 







|







 







|




|







 







|










|







 







|







 







|







 







|

|







 







|







 







|







 







|







 







|







 







|







 







|











|

|











13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
..
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
....
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
....
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
....
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
....
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
....
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
....
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
....
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
....
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
....
3655
3656
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
....
3930
3931
3932
3933
3934
3935
3936
3937
3938
3939
3940
3941
3942
3943
3944
....
3984
3985
3986
3987
3988
3989
3990
3991
3992
3993
3994
3995
3996
3997
3998
3999
4000
4001
4002
4003
4004
4005
4006
4007
4008
4009
4010
4011
4012
4013
4014
4015
4016
typedef struct SessionChange SessionChange;
typedef struct SessionBuffer SessionBuffer;
typedef struct SessionInput SessionInput;

/*
** Minimum chunk size used by streaming versions of functions.
*/
#ifndef SESSIONS_STRM_CHUNK_SIZE
# ifdef SQLITE_TEST
#   define SESSIONS_STRM_CHUNK_SIZE 64
# else
#   define SESSIONS_STRM_CHUNK_SIZE 1024
# endif
#endif

/*
** Session handle structure.
*/
struct sqlite3_session {
  sqlite3 *db;                    /* Database handle session is attached to */
................................................................................
  int nAlloc;                     /* Size of allocation containing aBuf */
};

/*
** An object of this type is used internally as an abstraction for 
** input data. Input data may be supplied either as a single large buffer
** (e.g. sqlite3changeset_start()) or using a stream function (e.g.
**  sqlite3changeset_start_strm()).
*/
struct SessionInput {
  int iNext;                      /* Offset in aData[] of next change */
  u8 *aData;                      /* Pointer to buffer containing changeset */
  int nData;                      /* Number of bytes in aData */

  SessionBuffer buf;              /* Current read buffer */
................................................................................
          }else if( p->op!=SQLITE_INSERT ){
            rc = sessionAppendDelete(&buf, bPatchset, p, nCol, abPK);
          }
          if( rc==SQLITE_OK ){
            rc = sqlite3_reset(pSel);
          }

          /* If the buffer is now larger than SESSIONS_STRM_CHUNK_SIZE, pass
          ** its contents to the xOutput() callback. */
          if( xOutput 
           && rc==SQLITE_OK 
           && buf.nBuf>nNoop 
           && buf.nBuf>SESSIONS_STRM_CHUNK_SIZE 
          ){
            rc = xOutput(pOut, (void*)buf.aBuf, buf.nBuf);
            nNoop = -1;
            buf.nBuf = 0;
          }

        }
................................................................................
){
  return sessionGenerateChangeset(pSession, 0, 0, 0, pnChangeset, ppChangeset);
}

/*
** Streaming version of sqlite3session_changeset().
*/
int sqlite3session_changeset_strm(
  sqlite3_session *pSession,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
){
  return sessionGenerateChangeset(pSession, 0, xOutput, pOut, 0, 0);
}

/*
** Streaming version of sqlite3session_patchset().
*/
int sqlite3session_patchset_strm(
  sqlite3_session *pSession,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
){
  return sessionGenerateChangeset(pSession, 1, xOutput, pOut, 0, 0);
}

................................................................................
  }
  sqlite3_mutex_leave(sqlite3_db_mutex(pSession->db));

  return (ret==0);
}

/*
** Do the work for either sqlite3changeset_start() or start_strm().
*/
int sessionChangesetStart(
  sqlite3_changeset_iter **pp,    /* OUT: Changeset iterator handle */
  int (*xInput)(void *pIn, void *pData, int *pnData),
  void *pIn,
  int nChangeset,                 /* Size of buffer pChangeset in bytes */
  void *pChangeset                /* Pointer to buffer containing changeset */
................................................................................
){
  return sessionChangesetStart(pp, 0, 0, nChangeset, pChangeset);
}

/*
** Streaming version of sqlite3changeset_start().
*/
int sqlite3changeset_start_strm(
  sqlite3_changeset_iter **pp,    /* OUT: Changeset iterator handle */
  int (*xInput)(void *pIn, void *pData, int *pnData),
  void *pIn
){
  return sessionChangesetStart(pp, xInput, pIn, 0, 0);
}

................................................................................
**
** Return an SQLite error code if an error occurs, or SQLITE_OK otherwise.
*/
static int sessionInputBuffer(SessionInput *pIn, int nByte){
  int rc = SQLITE_OK;
  if( pIn->xInput ){
    while( !pIn->bEof && (pIn->iNext+nByte)>=pIn->nData && rc==SQLITE_OK ){
      int nNew = SESSIONS_STRM_CHUNK_SIZE;

      if( pIn->iNext>=SESSIONS_STRM_CHUNK_SIZE ){
        int nMove = pIn->buf.nBuf - pIn->iNext;
        memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove);
        pIn->buf.nBuf -= pIn->iNext;
        pIn->iNext = 0;
      }

      if( SQLITE_OK==sessionBufferGrow(&pIn->buf, nNew, &rc) ){
................................................................................

      default:
        rc = SQLITE_CORRUPT_BKPT;
        goto finished_invert;
    }

    assert( rc==SQLITE_OK );
    if( xOutput && sOut.nBuf>=SESSIONS_STRM_CHUNK_SIZE ){
      rc = xOutput(pOut, sOut.aBuf, sOut.nBuf);
      sOut.nBuf = 0;
      if( rc!=SQLITE_OK ) goto finished_invert;
    }
  }

  assert( rc==SQLITE_OK );
................................................................................

  return sessionChangesetInvert(&sInput, 0, 0, pnInverted, ppInverted);
}

/*
** Streaming version of sqlite3changeset_invert().
*/
int sqlite3changeset_invert_strm(
  int (*xInput)(void *pIn, void *pData, int *pnData),
  void *pIn,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
){
  SessionInput sInput;
  int rc;
................................................................................
}

/*
** Apply the changeset passed via xInput/pIn to the main database
** attached to handle "db". Invoke the supplied conflict handler callback
** to resolve any conflicts encountered while applying the change.
*/
int sqlite3changeset_apply_strm(
  sqlite3 *db,                    /* Apply change to "main" db of this handle */
  int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
  void *pIn,                                          /* First arg for xInput */
  int(*xFilter)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    const char *zTab              /* Table name */
  ),
................................................................................
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
){
  sqlite3_changeset_iter *pIter;  /* Iterator to skip through changeset */  
  int rc = sqlite3changeset_start_strm(&pIter, xInput, pIn);
  if( rc==SQLITE_OK ){
    rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx);
  }
  return rc;
}

/*
................................................................................
      for(p=pTab->apChange[i]; p; p=p->pNext){
        sessionAppendByte(&buf, p->op, &rc);
        sessionAppendByte(&buf, p->bIndirect, &rc);
        sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc);
      }
    }

    if( rc==SQLITE_OK && xOutput && buf.nBuf>=SESSIONS_STRM_CHUNK_SIZE ){
      rc = xOutput(pOut, buf.aBuf, buf.nBuf);
      buf.nBuf = 0;
    }
  }

  if( rc==SQLITE_OK ){
    if( xOutput ){
................................................................................
  sqlite3changeset_finalize(pIter2);
  return rc;
}

/*
** Streaming version of sqlite3changeset_concat().
*/
int sqlite3changeset_concat_strm(
  int (*xInputA)(void *pIn, void *pData, int *pnData),
  void *pInA,
  int (*xInputB)(void *pIn, void *pData, int *pnData),
  void *pInB,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
){
  sqlite3_changeset_iter *pIter1 = 0;
  sqlite3_changeset_iter *pIter2 = 0;
  int rc;

  rc = sqlite3changeset_start_strm(&pIter1, xInputA, pInA);
  if( rc==SQLITE_OK ){
    rc = sqlite3changeset_start_strm(&pIter2, xInputB, pInB);
  }
  if( rc==SQLITE_OK ){
    rc = sessionChangesetConcat(pIter1, pIter2, xOutput, pOut, 0, 0);
  }

  sqlite3changeset_finalize(pIter1);
  sqlite3changeset_finalize(pIter2);
  return rc;
}

#endif /* SQLITE_ENABLE_SESSION && SQLITE_ENABLE_PREUPDATE_HOOK */

Changes to ext/session/sqlite3session.h.

942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
...
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
....
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
....
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
#define SQLITE_CHANGESET_OMIT       0
#define SQLITE_CHANGESET_REPLACE    1
#define SQLITE_CHANGESET_ABORT      2

/*
** CAPI3REF: Streaming Versions of API functions.
**
** The six streaming API xxx_str() functions serve similar purposes to the 
** corresponding non-streaming API functions:
**
** <table border=1 style="margin-left:8ex;margin-right:8ex">
**   <tr><th>Streaming function<th>Non-streaming equivalent</th>
**   <tr><td>sqlite3changeset_apply_str<td>[sqlite3changeset_apply] 
**   <tr><td>sqlite3changeset_concat_str<td>[sqlite3changeset_concat] 
**   <tr><td>sqlite3changeset_invert_str<td>[sqlite3changeset_invert] 
................................................................................
** into the buffer and set (*pnData) to the actual number of bytes copied 
** before returning SQLITE_OK. If the input is completely exhausted, (*pnData) 
** should be set to zero to indicate this. Or, if an error occurs, an SQLite 
** error code should be returned. In all cases, if an xInput callback returns
** an error, all processing is abandoned and the streaming API function
** returns a copy of the error code to the caller.
**
** In the case of sqlite3changeset_start_str(), the xInput callback may be
** invoked by the sessions module at any point during the lifetime of the
** iterator. If such an xInput callback returns an error, the iterator enters
** an error state, whereby all subsequent calls to iterator functions 
** immediately fail with the same error code as returned by xInput.
**
** Similarly, streaming API functions that return changesets (or patchsets)
** return them in chunks by way of a callback function instead of via a
................................................................................
** is immediately abandoned and the streaming API function returns a copy
** of the xOutput error code to the application.
**
** The sessions module never invokes an xOutput callback with the third 
** parameter set to a value less than or equal to zero. Other than this,
** no guarantees are made as to the size of the chunks of data returned.
*/
int sqlite3changeset_apply_str(
  sqlite3 *db,                    /* Apply change to "main" db of this handle */
  int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
  void *pIn,                                          /* First arg for xInput */
  int(*xFilter)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    const char *zTab              /* Table name */
  ),
................................................................................
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
);
int sqlite3changeset_concat_str(
  int (*xInputA)(void *pIn, void *pData, int *pnData),
  void *pInA,
  int (*xInputB)(void *pIn, void *pData, int *pnData),
  void *pInB,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
);
int sqlite3changeset_invert_str(
  int (*xInput)(void *pIn, void *pData, int *pnData),
  void *pIn,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
);
int sqlite3changeset_start_str(
  sqlite3_changeset_iter **pp,
  int (*xInput)(void *pIn, void *pData, int *pnData),
  void *pIn
);
int sqlite3session_changeset_str(
  sqlite3_session *pSession,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
);
int sqlite3session_patchset_str(
  sqlite3_session *pSession,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
);


/*







|







 







|







 







|







 







|







|





|




|




|







942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
...
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
....
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
....
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
#define SQLITE_CHANGESET_OMIT       0
#define SQLITE_CHANGESET_REPLACE    1
#define SQLITE_CHANGESET_ABORT      2

/*
** CAPI3REF: Streaming Versions of API functions.
**
** The six streaming API xxx_strm() functions serve similar purposes to the 
** corresponding non-streaming API functions:
**
** <table border=1 style="margin-left:8ex;margin-right:8ex">
**   <tr><th>Streaming function<th>Non-streaming equivalent</th>
**   <tr><td>sqlite3changeset_apply_str<td>[sqlite3changeset_apply] 
**   <tr><td>sqlite3changeset_concat_str<td>[sqlite3changeset_concat] 
**   <tr><td>sqlite3changeset_invert_str<td>[sqlite3changeset_invert] 
................................................................................
** into the buffer and set (*pnData) to the actual number of bytes copied 
** before returning SQLITE_OK. If the input is completely exhausted, (*pnData) 
** should be set to zero to indicate this. Or, if an error occurs, an SQLite 
** error code should be returned. In all cases, if an xInput callback returns
** an error, all processing is abandoned and the streaming API function
** returns a copy of the error code to the caller.
**
** In the case of sqlite3changeset_start_strm(), the xInput callback may be
** invoked by the sessions module at any point during the lifetime of the
** iterator. If such an xInput callback returns an error, the iterator enters
** an error state, whereby all subsequent calls to iterator functions 
** immediately fail with the same error code as returned by xInput.
**
** Similarly, streaming API functions that return changesets (or patchsets)
** return them in chunks by way of a callback function instead of via a
................................................................................
** is immediately abandoned and the streaming API function returns a copy
** of the xOutput error code to the application.
**
** The sessions module never invokes an xOutput callback with the third 
** parameter set to a value less than or equal to zero. Other than this,
** no guarantees are made as to the size of the chunks of data returned.
*/
int sqlite3changeset_apply_strm(
  sqlite3 *db,                    /* Apply change to "main" db of this handle */
  int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
  void *pIn,                                          /* First arg for xInput */
  int(*xFilter)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    const char *zTab              /* Table name */
  ),
................................................................................
  int(*xConflict)(
    void *pCtx,                   /* Copy of sixth arg to _apply() */
    int eConflict,                /* DATA, MISSING, CONFLICT, CONSTRAINT */
    sqlite3_changeset_iter *p     /* Handle describing change and conflict */
  ),
  void *pCtx                      /* First argument passed to xConflict */
);
int sqlite3changeset_concat_strm(
  int (*xInputA)(void *pIn, void *pData, int *pnData),
  void *pInA,
  int (*xInputB)(void *pIn, void *pData, int *pnData),
  void *pInB,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
);
int sqlite3changeset_invert_strm(
  int (*xInput)(void *pIn, void *pData, int *pnData),
  void *pIn,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
);
int sqlite3changeset_start_strm(
  sqlite3_changeset_iter **pp,
  int (*xInput)(void *pIn, void *pData, int *pnData),
  void *pIn
);
int sqlite3session_changeset_strm(
  sqlite3_session *pSession,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
);
int sqlite3session_patchset_strm(
  sqlite3_session *pSession,
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut
);


/*

Changes to ext/session/test_session.c.

156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
...
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
...
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
...
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
...
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811

    case 7:        /* patchset */
    case 1: {      /* changeset */
      TestSessionsBlob o = {0, 0};
      if( test_tcl_integer(interp, SESSION_STREAM_TCL_VAR) ){
        void *pCtx = (void*)&o;
        if( iSub==7 ){
          rc = sqlite3session_patchset_str(pSession, testStreamOutput, pCtx);
        }else{
          rc = sqlite3session_changeset_str(pSession, testStreamOutput, pCtx);
        }
      }else{
        if( iSub==7 ){
          rc = sqlite3session_patchset(pSession, &o.n, &o.p);
        }else{
          rc = sqlite3session_changeset(pSession, &o.n, &o.p);
        }
................................................................................
  if( sStr.nStream==0 ){
    rc = sqlite3changeset_apply(db, nChangeset, pChangeset, 
        (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx
    );
  }else{
    sStr.aData = (unsigned char*)pChangeset;
    sStr.nData = nChangeset;
    rc = sqlite3changeset_apply_str(db, testStreamInput, (void*)&sStr,
        (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx
    );
  }

  if( rc!=SQLITE_OK ){
    return test_session_error(interp, rc);
  }
................................................................................

  memset(&sIn, 0, sizeof(sIn));
  memset(&sOut, 0, sizeof(sOut));
  sIn.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);
  sIn.aData = Tcl_GetByteArrayFromObj(objv[1], &sIn.nData);

  if( sIn.nStream ){
    rc = sqlite3changeset_invert_str(
        testStreamInput, (void*)&sIn, testStreamOutput, (void*)&sOut
    );
  }else{
    rc = sqlite3changeset_invert(sIn.nData, sIn.aData, &sOut.n, &sOut.p);
  }
  if( rc!=SQLITE_OK ){
    rc = test_session_error(interp, rc);
................................................................................
  memset(&sRight, 0, sizeof(sRight));
  sLeft.aData = Tcl_GetByteArrayFromObj(objv[1], &sLeft.nData);
  sRight.aData = Tcl_GetByteArrayFromObj(objv[2], &sRight.nData);
  sLeft.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);
  sRight.nStream = sLeft.nStream;

  if( sLeft.nStream>0 ){
    rc = sqlite3changeset_concat_str(
        testStreamInput, (void*)&sLeft,
        testStreamInput, (void*)&sRight,
        testStreamOutput, (void*)&sOut
    );
  }else{
    rc = sqlite3changeset_concat(
        sLeft.nData, sLeft.aData, sRight.nData, sRight.aData, &sOut.n, &sOut.p
................................................................................
  pChangeset = (void *)Tcl_GetByteArrayFromObj(pCS, &nChangeset);
  sStr.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);
  if( sStr.nStream==0 ){
    rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
  }else{
    sStr.aData = (unsigned char*)pChangeset;
    sStr.nData = nChangeset;
    rc = sqlite3changeset_start_str(&pIter, testStreamInput, (void*)&sStr);
  }
  if( rc!=SQLITE_OK ){
    return test_session_error(interp, rc);
  }

  while( SQLITE_ROW==sqlite3changeset_next(pIter) ){
    int nCol;                     /* Number of columns in table */







|

|







 







|







 







|







 







|







 







|







156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
...
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
...
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
...
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
...
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811

    case 7:        /* patchset */
    case 1: {      /* changeset */
      TestSessionsBlob o = {0, 0};
      if( test_tcl_integer(interp, SESSION_STREAM_TCL_VAR) ){
        void *pCtx = (void*)&o;
        if( iSub==7 ){
          rc = sqlite3session_patchset_strm(pSession, testStreamOutput, pCtx);
        }else{
          rc = sqlite3session_changeset_strm(pSession, testStreamOutput, pCtx);
        }
      }else{
        if( iSub==7 ){
          rc = sqlite3session_patchset(pSession, &o.n, &o.p);
        }else{
          rc = sqlite3session_changeset(pSession, &o.n, &o.p);
        }
................................................................................
  if( sStr.nStream==0 ){
    rc = sqlite3changeset_apply(db, nChangeset, pChangeset, 
        (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx
    );
  }else{
    sStr.aData = (unsigned char*)pChangeset;
    sStr.nData = nChangeset;
    rc = sqlite3changeset_apply_strm(db, testStreamInput, (void*)&sStr,
        (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx
    );
  }

  if( rc!=SQLITE_OK ){
    return test_session_error(interp, rc);
  }
................................................................................

  memset(&sIn, 0, sizeof(sIn));
  memset(&sOut, 0, sizeof(sOut));
  sIn.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);
  sIn.aData = Tcl_GetByteArrayFromObj(objv[1], &sIn.nData);

  if( sIn.nStream ){
    rc = sqlite3changeset_invert_strm(
        testStreamInput, (void*)&sIn, testStreamOutput, (void*)&sOut
    );
  }else{
    rc = sqlite3changeset_invert(sIn.nData, sIn.aData, &sOut.n, &sOut.p);
  }
  if( rc!=SQLITE_OK ){
    rc = test_session_error(interp, rc);
................................................................................
  memset(&sRight, 0, sizeof(sRight));
  sLeft.aData = Tcl_GetByteArrayFromObj(objv[1], &sLeft.nData);
  sRight.aData = Tcl_GetByteArrayFromObj(objv[2], &sRight.nData);
  sLeft.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);
  sRight.nStream = sLeft.nStream;

  if( sLeft.nStream>0 ){
    rc = sqlite3changeset_concat_strm(
        testStreamInput, (void*)&sLeft,
        testStreamInput, (void*)&sRight,
        testStreamOutput, (void*)&sOut
    );
  }else{
    rc = sqlite3changeset_concat(
        sLeft.nData, sLeft.aData, sRight.nData, sRight.aData, &sOut.n, &sOut.p
................................................................................
  pChangeset = (void *)Tcl_GetByteArrayFromObj(pCS, &nChangeset);
  sStr.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);
  if( sStr.nStream==0 ){
    rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
  }else{
    sStr.aData = (unsigned char*)pChangeset;
    sStr.nData = nChangeset;
    rc = sqlite3changeset_start_strm(&pIter, testStreamInput, (void*)&sStr);
  }
  if( rc!=SQLITE_OK ){
    return test_session_error(interp, rc);
  }

  while( SQLITE_ROW==sqlite3changeset_next(pIter) ){
    int nCol;                     /* Number of columns in table */

Changes to test/permutations.test.

934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
  All session module related tests with sqlite3_extended_result_codes() set. 
} -files [
  glob -nocomplain $::testdir/../ext/session/*.test
] -dbconfig {
  sqlite3_extended_result_codes $::dbhandle 1
}

test_suite "session_str" -description {
  All session module related tests using the streaming APIs.
} -files [
  glob -nocomplain $::testdir/../ext/session/*.test
] -dbconfig {
  set ::sqlite3session_streams 1
}








|







934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
  All session module related tests with sqlite3_extended_result_codes() set. 
} -files [
  glob -nocomplain $::testdir/../ext/session/*.test
] -dbconfig {
  sqlite3_extended_result_codes $::dbhandle 1
}

test_suite "session_strm" -description {
  All session module related tests using the streaming APIs.
} -files [
  glob -nocomplain $::testdir/../ext/session/*.test
] -dbconfig {
  set ::sqlite3session_streams 1
}

Changes to test/session.test.

12
13
14
15
16
17
18
19
20
21
22
source $testdir/permutations.test

ifcapable session {
  # First run tests with sqlite3_extended_error_codes() set, then
  # again with it clear.
  run_test_suite session_eec
  run_test_suite session
  run_test_suite session_str
}

finish_test







|



12
13
14
15
16
17
18
19
20
21
22
source $testdir/permutations.test

ifcapable session {
  # First run tests with sqlite3_extended_error_codes() set, then
  # again with it clear.
  run_test_suite session_eec
  run_test_suite session
  run_test_suite session_strm
}

finish_test