/ Check-in [3c7d3d95]
Login
SQLite training in Houston TX on 2019-11-05 (details)
Part of the 2019 Tcl Conference

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Begin adding 'streaming' APIs to sessions module. This is a work in progress.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | sessions
Files: files | file ages | folders
SHA1: 3c7d3d950bbf5f5ed3696ebc61c77ca48bafe2b5
User & Date: dan 2014-09-23 20:39:55
Context
2014-09-24
17:13
Add streaming version of sqlite3changeset_apply(). Tests and fixes for the same and sqlite3changeset_start_str(). check-in: b917fc14 user: dan tags: sessions
2014-09-23
20:39
Begin adding 'streaming' APIs to sessions module. This is a work in progress. check-in: 3c7d3d95 user: dan tags: sessions
2014-09-21
22:49
Merge all recent trunk changes into the sessions branch. check-in: 6406b77f user: drh tags: sessions
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to ext/session/sqlite3session.c.

     8      8   # include "sqliteInt.h"
     9      9   # include "vdbeInt.h"
    10     10   #endif
    11     11   
    12     12   typedef struct SessionTable SessionTable;
    13     13   typedef struct SessionChange SessionChange;
    14     14   typedef struct SessionBuffer SessionBuffer;
           15  +typedef struct SessionInput SessionInput;
           16  +
           17  +/*
           18  +** Minimum chunk size used by streaming versions of functions.
           19  +*/
           20  +#define SESSIONS_STR_CHUNK_SIZE 1024
    15     21   
    16     22   /*
    17     23   ** Session handle structure.
    18     24   */
    19     25   struct sqlite3_session {
    20     26     sqlite3 *db;                    /* Database handle session is attached to */
    21     27     char *zDb;                      /* Name of database session is attached to */
................................................................................
    25     31     int rc;                         /* Non-zero if an error has occurred */
    26     32     void *pFilterCtx;               /* First argument to pass to xTableFilter */
    27     33     int (*xTableFilter)(void *pCtx, const char *zTab);
    28     34     sqlite3_session *pNext;         /* Next session object on same db. */
    29     35     SessionTable *pTable;           /* List of attached tables */
    30     36   };
    31     37   
           38  +/*
           39  +** Instances of this structure are used to build strings or binary records.
           40  +*/
           41  +struct SessionBuffer {
           42  +  u8 *aBuf;                       /* Pointer to changeset buffer */
           43  +  int nBuf;                       /* Size of buffer aBuf */
           44  +  int nAlloc;                     /* Size of allocation containing aBuf */
           45  +};
           46  +
           47  +/*
           48  +** An object of this type is used internally as an abstraction for the 
           49  +** input data read by changeset iterators. Input data may be supplied 
           50  +** either as a single large buffer (sqlite3changeset_start()) or using
           51  +** a stream function (sqlite3changeset_start_str()).
           52  +*/
           53  +struct SessionInput {
           54  +  int iNext;                      /* Offset in aChangeset[] of next change */
           55  +  u8 *aChangeset;                 /* Pointer to buffer containing changeset */
           56  +  int nChangeset;                 /* Number of bytes in aChangeset */
           57  +  SessionBuffer buf;              /* Current read buffer */
           58  +  int (*xInput)(void*, void*, int*);        /* Input stream call (or NULL) */
           59  +  void *pIn;                                /* First argument to xInput */
           60  +  int bEof;                       /* Set to true after xInput finished */
           61  +};
           62  +
    32     63   /*
    33     64   ** Structure for changeset iterators.
    34     65   */
    35     66   struct sqlite3_changeset_iter {
    36         -  u8 *aChangeset;                 /* Pointer to buffer containing changeset */
    37         -  int nChangeset;                 /* Number of bytes in aChangeset */
           67  +  SessionInput in;                /* Input buffer or stream */
           68  +  SessionBuffer tblhdr;           /* Buffer to hold apValue/zTab/abPK/ */
    38     69     int bPatchset;                  /* True if this is a patchset */
    39         -  u8 *pNext;                      /* Pointer to next change within aChangeset */
    40     70     int rc;                         /* Iterator error code */
    41     71     sqlite3_stmt *pConflict;        /* Points to conflicting row, if any */
    42     72     char *zTab;                     /* Current table */
    43     73     int nCol;                       /* Number of columns in zTab */
    44     74     int op;                         /* Current operation */
    45     75     int bIndirect;                  /* True if current change was indirect */
    46     76     u8 *abPK;                       /* Primary key array */
................................................................................
   161    191     int op;                         /* One of UPDATE, DELETE, INSERT */
   162    192     int bIndirect;                  /* True if this change is "indirect" */
   163    193     int nRecord;                    /* Number of bytes in buffer aRecord[] */
   164    194     u8 *aRecord;                    /* Buffer containing old.* record */
   165    195     SessionChange *pNext;           /* For hash-table collisions */
   166    196   };
   167    197   
   168         -/*
   169         -** Instances of this structure are used to build strings or binary records.
   170         -*/
   171         -struct SessionBuffer {
   172         -  u8 *aBuf;                       /* Pointer to changeset buffer */
   173         -  int nBuf;                       /* Size of buffer aBuf */
   174         -  int nAlloc;                     /* Size of allocation containing aBuf */
   175         -};
   176         -
   177    198   /*
   178    199   ** Write a varint with value iVal into the buffer at aBuf. Return the 
   179    200   ** number of bytes written.
   180    201   */
   181    202   static int sessionVarintPut(u8 *aBuf, int iVal){
   182    203     return putVarint32(aBuf, iVal);
   183    204   }
................................................................................
  1287   1308   */
  1288   1309   static int sessionBufferGrow(SessionBuffer *p, int nByte, int *pRc){
  1289   1310     if( *pRc==SQLITE_OK && p->nAlloc-p->nBuf<nByte ){
  1290   1311       u8 *aNew;
  1291   1312       int nNew = p->nAlloc ? p->nAlloc : 128;
  1292   1313       do {
  1293   1314         nNew = nNew*2;
  1294         -    }while( nNew<(p->nAlloc+nByte) );
         1315  +    }while( nNew<(p->nBuf+nByte) );
  1295   1316   
  1296   1317       aNew = (u8 *)sqlite3_realloc(p->aBuf, nNew);
  1297   1318       if( 0==aNew ){
  1298   1319         *pRc = SQLITE_NOMEM;
  1299   1320       }else{
  1300   1321         p->aBuf = aNew;
  1301   1322         p->nAlloc = nNew;
................................................................................
  1772   1793   ** stored in output variables *pnChangeset and *ppChangeset. Or, if an error
  1773   1794   ** occurs, an SQLite error code is returned and both output variables set 
  1774   1795   ** to 0.
  1775   1796   */
  1776   1797   int sessionGenerateChangeset(
  1777   1798     sqlite3_session *pSession,      /* Session object */
  1778   1799     int bPatchset,                  /* True for patchset, false for changeset */
         1800  +  int (*xOutput)(void *pOut, const void *pData, int nData),
         1801  +  void *pOut,                     /* First argument for xOutput */
  1779   1802     int *pnChangeset,               /* OUT: Size of buffer at *ppChangeset */
  1780   1803     void **ppChangeset              /* OUT: Buffer containing changeset */
  1781   1804   ){
  1782   1805     sqlite3 *db = pSession->db;     /* Source database handle */
  1783   1806     SessionTable *pTab;             /* Used to iterate through attached tables */
  1784   1807     SessionBuffer buf = {0,0,0};    /* Buffer in which to accumlate changeset */
  1785   1808     int rc;                         /* Return code */
         1809  +
         1810  +  assert( xOutput==0 || (pnChangeset==0 && ppChangeset==0 ) );
  1786   1811   
  1787   1812     /* Zero the output variables in case an error occurs. If this session
  1788   1813     ** object is already in the error state (sqlite3_session.rc != SQLITE_OK),
  1789   1814     ** this call will be a no-op.  */
  1790         -  *pnChangeset = 0;
  1791         -  *ppChangeset = 0;
         1815  +  if( xOutput==0 ){
         1816  +    *pnChangeset = 0;
         1817  +    *ppChangeset = 0;
         1818  +  }
  1792   1819   
  1793   1820     if( pSession->rc ) return pSession->rc;
  1794   1821     rc = sqlite3_exec(pSession->db, "SAVEPOINT changeset", 0, 0, 0);
  1795   1822     if( rc!=SQLITE_OK ) return rc;
  1796   1823   
  1797   1824     sqlite3_mutex_enter(sqlite3_db_mutex(db));
  1798   1825   
................................................................................
  1842   1869               }
  1843   1870             }else if( p->op!=SQLITE_INSERT ){
  1844   1871               rc = sessionAppendDelete(&buf, bPatchset, p, nCol, abPK);
  1845   1872             }
  1846   1873             if( rc==SQLITE_OK ){
  1847   1874               rc = sqlite3_reset(pSel);
  1848   1875             }
         1876  +
         1877  +          /* If the buffer is now larger than SESSIONS_STR_CHUNK_SIZE, pass
         1878  +          ** its contents to the xOutput() callback. */
         1879  +          if( xOutput 
         1880  +           && rc==SQLITE_OK 
         1881  +           && buf.nBuf>nNoop 
         1882  +           && buf.nBuf>SESSIONS_STR_CHUNK_SIZE 
         1883  +          ){
         1884  +            rc = xOutput(pOut, (void*)buf.aBuf, buf.nBuf);
         1885  +            nNoop = -1;
         1886  +            buf.nBuf = 0;
         1887  +          }
         1888  +
  1849   1889           }
  1850   1890         }
  1851   1891   
  1852   1892         sqlite3_finalize(pSel);
  1853   1893         if( buf.nBuf==nNoop ){
  1854   1894           buf.nBuf = nRewind;
  1855   1895         }
  1856   1896         sqlite3_free((char*)azCol);  /* cast works around VC++ bug */
  1857   1897       }
  1858   1898     }
  1859   1899   
  1860   1900     if( rc==SQLITE_OK ){
  1861         -    *pnChangeset = buf.nBuf;
  1862         -    *ppChangeset = buf.aBuf;
  1863         -  }else{
  1864         -    sqlite3_free(buf.aBuf);
         1901  +    if( xOutput==0 ){
         1902  +      *pnChangeset = buf.nBuf;
         1903  +      *ppChangeset = buf.aBuf;
         1904  +      buf.aBuf = 0;
         1905  +    }else if( buf.nBuf>0 ){
         1906  +      rc = xOutput(pOut, (void*)buf.aBuf, buf.nBuf);
         1907  +    }
  1865   1908     }
  1866   1909   
         1910  +  sqlite3_free(buf.aBuf);
  1867   1911     sqlite3_exec(db, "RELEASE changeset", 0, 0, 0);
  1868   1912     sqlite3_mutex_leave(sqlite3_db_mutex(db));
  1869   1913     return rc;
  1870   1914   }
  1871   1915   
  1872   1916   /*
  1873   1917   ** Obtain a changeset object containing all changes recorded by the 
................................................................................
  1877   1921   ** using sqlite3_free().
  1878   1922   */
  1879   1923   int sqlite3session_changeset(
  1880   1924     sqlite3_session *pSession,      /* Session object */
  1881   1925     int *pnChangeset,               /* OUT: Size of buffer at *ppChangeset */
  1882   1926     void **ppChangeset              /* OUT: Buffer containing changeset */
  1883   1927   ){
  1884         -  return sessionGenerateChangeset(pSession, 0, pnChangeset, ppChangeset);
         1928  +  return sessionGenerateChangeset(pSession, 0, 0, 0, pnChangeset, ppChangeset);
         1929  +}
         1930  +
         1931  +/*
         1932  +** Streaming version of sqlite3session_changeset().
         1933  +*/
         1934  +int sqlite3session_changeset_str(
         1935  +  sqlite3_session *pSession,
         1936  +  int (*xOutput)(void *pOut, const void *pData, int nData),
         1937  +  void *pOut
         1938  +){
         1939  +  return sessionGenerateChangeset(pSession, 0, xOutput, pOut, 0, 0);
         1940  +}
         1941  +
         1942  +/*
         1943  +** Streaming version of sqlite3session_patchset().
         1944  +*/
         1945  +int sqlite3session_patchset_str(
         1946  +  sqlite3_session *pSession,
         1947  +  int (*xOutput)(void *pOut, const void *pData, int nData),
         1948  +  void *pOut
         1949  +){
         1950  +  return sessionGenerateChangeset(pSession, 1, xOutput, pOut, 0, 0);
  1885   1951   }
  1886   1952   
  1887   1953   /*
  1888   1954   ** Obtain a patchset object containing all changes recorded by the 
  1889   1955   ** session object passed as the first argument.
  1890   1956   **
  1891   1957   ** It is the responsibility of the caller to eventually free the buffer 
................................................................................
  1892   1958   ** using sqlite3_free().
  1893   1959   */
  1894   1960   int sqlite3session_patchset(
  1895   1961     sqlite3_session *pSession,      /* Session object */
  1896   1962     int *pnPatchset,                /* OUT: Size of buffer at *ppChangeset */
  1897   1963     void **ppPatchset               /* OUT: Buffer containing changeset */
  1898   1964   ){
  1899         -  return sessionGenerateChangeset(pSession, 1, pnPatchset, ppPatchset);
         1965  +  return sessionGenerateChangeset(pSession, 1, 0, 0, pnPatchset, ppPatchset);
  1900   1966   }
  1901   1967   
  1902   1968   /*
  1903   1969   ** Enable or disable the session object passed as the first argument.
  1904   1970   */
  1905   1971   int sqlite3session_enable(sqlite3_session *pSession, int bEnable){
  1906   1972     int ret;
................................................................................
  1941   2007     }
  1942   2008     sqlite3_mutex_leave(sqlite3_db_mutex(pSession->db));
  1943   2009   
  1944   2010     return (ret==0);
  1945   2011   }
  1946   2012   
  1947   2013   /*
  1948         -** Create an iterator used to iterate through the contents of a changeset.
         2014  +** Do the work for either sqlite3changeset_start() or start_str().
  1949   2015   */
  1950         -int sqlite3changeset_start(
         2016  +int sessionChangesetStart(
  1951   2017     sqlite3_changeset_iter **pp,    /* OUT: Changeset iterator handle */
         2018  +  int (*xInput)(void *pIn, void *pData, int *pnData),
         2019  +  void *pIn,
  1952   2020     int nChangeset,                 /* Size of buffer pChangeset in bytes */
  1953   2021     void *pChangeset                /* Pointer to buffer containing changeset */
  1954   2022   ){
  1955   2023     sqlite3_changeset_iter *pRet;   /* Iterator to return */
  1956   2024     int nByte;                      /* Number of bytes to allocate for iterator */
         2025  +
         2026  +  assert( xInput==0 || (pChangeset==0 && nChangeset==0) );
  1957   2027   
  1958   2028     /* Zero the output variable in case an error occurs. */
  1959   2029     *pp = 0;
  1960   2030   
  1961   2031     /* Allocate and initialize the iterator structure. */
  1962   2032     nByte = sizeof(sqlite3_changeset_iter);
  1963   2033     pRet = (sqlite3_changeset_iter *)sqlite3_malloc(nByte);
  1964   2034     if( !pRet ) return SQLITE_NOMEM;
  1965   2035     memset(pRet, 0, sizeof(sqlite3_changeset_iter));
  1966         -  pRet->aChangeset = (u8 *)pChangeset;
  1967         -  pRet->nChangeset = nChangeset;
  1968         -  pRet->pNext = pRet->aChangeset;
         2036  +  pRet->in.aChangeset = (u8 *)pChangeset;
         2037  +  pRet->in.nChangeset = nChangeset;
         2038  +  pRet->in.xInput = xInput;
         2039  +  pRet->in.pIn = pIn;
         2040  +  pRet->in.iNext = 0;
         2041  +  pRet->in.bEof = (xInput ? 0 : 1);
  1969   2042   
  1970   2043     /* Populate the output variable and return success. */
  1971   2044     *pp = pRet;
  1972   2045     return SQLITE_OK;
  1973   2046   }
         2047  +
         2048  +/*
         2049  +** Create an iterator used to iterate through the contents of a changeset.
         2050  +*/
         2051  +int sqlite3changeset_start(
         2052  +  sqlite3_changeset_iter **pp,    /* OUT: Changeset iterator handle */
         2053  +  int nChangeset,                 /* Size of buffer pChangeset in bytes */
         2054  +  void *pChangeset                /* Pointer to buffer containing changeset */
         2055  +){
         2056  +  return sessionChangesetStart(pp, 0, 0, nChangeset, pChangeset);
         2057  +}
         2058  +
         2059  +/*
         2060  +** Streaming version of sqlite3changeset_start().
         2061  +*/
         2062  +int sqlite3changeset_start_str(
         2063  +  sqlite3_changeset_iter **pp,    /* OUT: Changeset iterator handle */
         2064  +  int (*xInput)(void *pIn, void *pData, int *pnData),
         2065  +  void *pIn
         2066  +){
         2067  +  return sessionChangesetStart(pp, xInput, pIn, 0, 0);
         2068  +}
         2069  +
         2070  +/*
         2071  +** Ensure that there are at least nByte bytes available in the buffer. Or,
         2072  +** if there are not nByte bytes remaining in the input, that all available
         2073  +** data is in the buffer.
         2074  +**
         2075  +** Return an SQLite error code if an error occurs, or SQLITE_OK otherwise.
         2076  +*/
         2077  +static int sessionInputBuffer(SessionInput *pInput, int nByte){
         2078  +  int rc = SQLITE_OK;
         2079  +  if( pInput->xInput && !pInput->bEof ){
         2080  +    assert( 0 );
         2081  +  }
         2082  +  return rc;
         2083  +}
         2084  +
         2085  +/*
         2086  +** When this function is called, *ppRec points to the start of a record
         2087  +** that contains nCol values. This function advances the pointer *ppRec
         2088  +** until it points to the byte immediately following that record.
         2089  +*/
         2090  +static void sessionSkipRecord(
         2091  +  u8 **ppRec,                     /* IN/OUT: Record pointer */
         2092  +  int nCol                        /* Number of values in record */
         2093  +){
         2094  +  u8 *aRec = *ppRec;
         2095  +  int i;
         2096  +  for(i=0; i<nCol; i++){
         2097  +    int eType = *aRec++;
         2098  +    if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){
         2099  +      int nByte;
         2100  +      aRec += sessionVarintGet((u8*)aRec, &nByte);
         2101  +      aRec += nByte;
         2102  +    }else if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){
         2103  +      aRec += 8;
         2104  +    }
         2105  +  }
         2106  +
         2107  +  *ppRec = aRec;
         2108  +}
  1974   2109   
  1975   2110   /*
  1976   2111   ** Deserialize a single record from a buffer in memory. See "RECORD FORMAT"
  1977   2112   ** for details.
  1978   2113   **
  1979   2114   ** When this function is called, *paChange points to the start of the record
  1980   2115   ** to deserialize. Assuming no error occurs, *paChange is set to point to
................................................................................
  1993   2128   ** It is the responsibility of the caller to free all sqlite_value structures
  1994   2129   ** using sqlite3_free().
  1995   2130   **
  1996   2131   ** If an error occurs, an SQLite error code (e.g. SQLITE_NOMEM) is returned.
  1997   2132   ** The apOut[] array may have been partially populated in this case.
  1998   2133   */
  1999   2134   static int sessionReadRecord(
  2000         -  u8 **paChange,                  /* IN/OUT: Pointer to binary record */
         2135  +  SessionInput *pIn,              /* Input data */
  2001   2136     int nCol,                       /* Number of values in record */
  2002   2137     u8 *abPK,                       /* Array of primary key flags, or NULL */
  2003   2138     sqlite3_value **apOut           /* Write values to this array */
  2004   2139   ){
  2005   2140     int i;                          /* Used to iterate through columns */
  2006         -  u8 *aRec = *paChange;           /* Cursor for the serialized record */
         2141  +  int rc = SQLITE_OK;
  2007   2142   
  2008         -  for(i=0; i<nCol; i++){
  2009         -    int eType;
         2143  +  for(i=0; i<nCol && rc==SQLITE_OK; i++){
         2144  +    int eType = 0;                /* Type of value (SQLITE_NULL, TEXT etc.) */
  2010   2145       if( abPK && abPK[i]==0 ) continue;
  2011         -    eType = *aRec++;              /* Type of value (SQLITE_NULL, TEXT etc.) */
         2146  +    rc = sessionInputBuffer(pIn, 9);
         2147  +    if( rc==SQLITE_OK ){
         2148  +      eType = pIn->aChangeset[pIn->iNext++];
         2149  +    }
         2150  +
  2012   2151       assert( !apOut || apOut[i]==0 );
  2013   2152       if( eType ){
  2014   2153         if( apOut ){
  2015   2154           apOut[i] = sqlite3ValueNew(0);
  2016         -        if( !apOut[i] ) return SQLITE_NOMEM;
         2155  +        if( !apOut[i] ) rc = SQLITE_NOMEM;
  2017   2156         }
         2157  +    }
  2018   2158   
         2159  +    if( rc==SQLITE_OK ){
         2160  +      u8 *aVal = &pIn->aChangeset[pIn->iNext];
  2019   2161         if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){
  2020   2162           int nByte;
  2021         -        aRec += sessionVarintGet(aRec, &nByte);
  2022         -        if( apOut ){
         2163  +        pIn->iNext += sessionVarintGet(aVal, &nByte);
         2164  +        rc = sessionInputBuffer(pIn, nByte);
         2165  +        if( apOut && rc==SQLITE_OK ){
         2166  +          u8 *aRec = &pIn->aChangeset[pIn->iNext];
  2023   2167             u8 enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0);
  2024   2168             sqlite3ValueSetStr(apOut[i], nByte, (char *)aRec, enc, SQLITE_STATIC);
  2025   2169           }
  2026         -        aRec += nByte;
         2170  +        pIn->iNext += nByte;
  2027   2171         }
  2028   2172         if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){
  2029   2173           if( apOut ){
  2030         -          sqlite3_int64 v = sessionGetI64(aRec);
         2174  +          sqlite3_int64 v = sessionGetI64(aVal);
  2031   2175             if( eType==SQLITE_INTEGER ){
  2032   2176               sqlite3VdbeMemSetInt64(apOut[i], v);
  2033   2177             }else{
  2034   2178               double d;
  2035   2179               memcpy(&d, &v, 8);
  2036   2180               sqlite3VdbeMemSetDouble(apOut[i], d);
  2037   2181             }
  2038   2182           }
  2039         -        aRec += 8;
  2040         -      }
  2041         -    }
  2042         -  }
  2043         -
  2044         -  *paChange = aRec;
  2045         -  return SQLITE_OK;
         2183  +        pIn->iNext += 8;
         2184  +      }
         2185  +    }
         2186  +  }
         2187  +
         2188  +  return rc;
         2189  +}
         2190  +
         2191  +/*
         2192  +** The input pointer currently points to the second byte of a table-header.
         2193  +** Specifically, to the following:
         2194  +**
         2195  +**   + number of columns in table (varint)
         2196  +**   + array of PK flags (1 byte per column),
         2197  +**   + table name (nul terminated).
         2198  +**
         2199  +** This function ensures that all of the above is present in the input 
         2200  +** buffer (i.e. that it can be accessed without any calls to xInput()).
         2201  +** If successful, SQLITE_OK is returned. Otherwise, an SQLite error code.
         2202  +** The input pointer is not moved.
         2203  +*/
         2204  +static int sessionChangesetBufferTblhdr(SessionInput *pIn, int *pnByte){
         2205  +  int rc = SQLITE_OK;
         2206  +  int nCol = 0;
         2207  +  int iIn = pIn->iNext;
         2208  +
         2209  +  rc = sessionInputBuffer(pIn, 9);
         2210  +  if( rc==SQLITE_OK ){
         2211  +    iIn += sessionVarintGet(&pIn->aChangeset[iIn], &nCol);
         2212  +    rc = sessionInputBuffer(pIn, nCol+100);
         2213  +    iIn += nCol;
         2214  +  }
         2215  +  while( rc==SQLITE_OK ){
         2216  +    while( iIn<pIn->nChangeset && pIn->aChangeset[iIn] ) iIn++;
         2217  +    if( pIn->aChangeset[iIn]==0 ) break;
         2218  +    rc = sessionInputBuffer(pIn, 100);
         2219  +  }
         2220  +  if( pnByte ) *pnByte = (iIn+1 - pIn->iNext);
         2221  +  return rc;
         2222  +}
         2223  +
         2224  +/*
         2225  +** The input pointer currently points to the second byte of a table-header.
         2226  +** Specifically, to the following:
         2227  +**
         2228  +**   + number of columns in table (varint)
         2229  +**   + array of PK flags (1 byte per column),
         2230  +**   + table name (nul terminated).
         2231  +*/
         2232  +static int sessionChangesetReadTblhdr(sqlite3_changeset_iter *p){
         2233  +  int rc;
         2234  +  int nCopy;
         2235  +  assert( p->rc==SQLITE_OK );
         2236  +
         2237  +  rc = sessionChangesetBufferTblhdr(&p->in, &nCopy);
         2238  +  if( rc==SQLITE_OK ){
         2239  +    int nByte;
         2240  +    int nVarint;
         2241  +    nVarint = sessionVarintGet(&p->in.aChangeset[p->in.iNext], &p->nCol);
         2242  +    nCopy -= nVarint;
         2243  +    p->in.iNext += nVarint;
         2244  +    nByte = p->nCol * sizeof(sqlite3_value*) * 2 + nCopy;
         2245  +    p->tblhdr.nBuf = 0;
         2246  +    sessionBufferGrow(&p->tblhdr, nByte, &rc);
         2247  +  }
         2248  +
         2249  +  if( rc==SQLITE_OK ){
         2250  +    int iPK = sizeof(sqlite3_value*)*p->nCol*2;
         2251  +    memset(p->tblhdr.aBuf, 0, iPK);
         2252  +    memcpy(&p->tblhdr.aBuf[iPK], &p->in.aChangeset[p->in.iNext], nCopy);
         2253  +    p->in.iNext += nCopy;
         2254  +  }
         2255  +
         2256  +  p->apValue = (sqlite3_value**)p->tblhdr.aBuf;
         2257  +  p->abPK = (u8*)&p->apValue[p->nCol*2];
         2258  +  p->zTab = (char*)&p->abPK[p->nCol];
         2259  +  return (p->rc = rc);
  2046   2260   }
  2047   2261   
  2048   2262   /*
  2049   2263   ** Advance the changeset iterator to the next change.
  2050   2264   **
  2051   2265   ** If both paRec and pnRec are NULL, then this function works like the public
  2052   2266   ** API sqlite3changeset_next(). If SQLITE_ROW is returned, then the
................................................................................
  2062   2276   ** changes in the changeset.
  2063   2277   */
  2064   2278   static int sessionChangesetNext(
  2065   2279     sqlite3_changeset_iter *p,      /* Changeset iterator */
  2066   2280     u8 **paRec,                     /* If non-NULL, store record pointer here */
  2067   2281     int *pnRec                      /* If non-NULL, store size of record here */
  2068   2282   ){
  2069         -  u8 *aChange;
  2070   2283     int i;
         2284  +  u8 op;
  2071   2285   
         2286  +  assert( paRec==0 || p->in.xInput==0 ); /* fixme! */
  2072   2287     assert( (paRec==0 && pnRec==0) || (paRec && pnRec) );
  2073   2288   
  2074   2289     /* If the iterator is in the error-state, return immediately. */
  2075   2290     if( p->rc!=SQLITE_OK ) return p->rc;
  2076   2291   
  2077   2292     /* Free the current contents of p->apValue[], if any. */
  2078   2293     if( p->apValue ){
  2079   2294       for(i=0; i<p->nCol*2; i++){
  2080   2295         sqlite3ValueFree(p->apValue[i]);
  2081   2296       }
  2082   2297       memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2);
  2083   2298     }
         2299  +
         2300  +  /* Make sure the buffer contains at least 10 bytes of input data, or all
         2301  +  ** remaining data if there are less than 10 bytes available. This is
         2302  +  ** sufficient either for the 'T' or 'P' byte and the varint that follows
         2303  +  ** it, or for the two single byte values otherwise. */
         2304  +  p->rc = sessionInputBuffer(&p->in, 2);
         2305  +  if( p->rc!=SQLITE_OK ) return p->rc;
  2084   2306   
  2085   2307     /* If the iterator is already at the end of the changeset, return DONE. */
  2086         -  if( p->pNext>=&p->aChangeset[p->nChangeset] ){
         2308  +  if( p->in.iNext>=p->in.nChangeset ){
  2087   2309       return SQLITE_DONE;
  2088   2310     }
  2089         -  aChange = p->pNext;
  2090   2311   
  2091         -  if( aChange[0]=='T' || aChange[0]=='P' ){
  2092         -    int nByte;                    /* Bytes to allocate for apValue */
  2093         -    p->bPatchset = (aChange[0]=='P');
  2094         -    aChange++;
  2095         -    aChange += sessionVarintGet(aChange, &p->nCol);
  2096         -    p->abPK = (u8 *)aChange;
  2097         -    aChange += p->nCol;
  2098         -    p->zTab = (char *)aChange;
  2099         -    aChange += (sqlite3Strlen30((char *)aChange) + 1);
  2100         -    
  2101         -    if( paRec==0 ){
  2102         -      sqlite3_free(p->apValue);
  2103         -      nByte = sizeof(sqlite3_value *) * p->nCol * 2;
  2104         -      p->apValue = (sqlite3_value **)sqlite3_malloc(nByte);
  2105         -      if( !p->apValue ){
  2106         -        return (p->rc = SQLITE_NOMEM);
  2107         -      }
  2108         -      memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2);
  2109         -    }
         2312  +  op = p->in.aChangeset[p->in.iNext++];
         2313  +  if( op=='T' || op=='P' ){
         2314  +    p->bPatchset = (op=='P');
         2315  +    if( sessionChangesetReadTblhdr(p) ) return p->rc;
         2316  +    if( (p->rc = sessionInputBuffer(&p->in, 2)) ) return p->rc;
         2317  +    op = p->in.aChangeset[p->in.iNext++];
  2110   2318     }
  2111   2319   
  2112         -  p->op = *(aChange++);
  2113         -  p->bIndirect = *(aChange++);
         2320  +  p->op = op;
         2321  +  p->bIndirect = p->in.aChangeset[p->in.iNext++];
  2114   2322     if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){
  2115   2323       return (p->rc = SQLITE_CORRUPT);
  2116   2324     }
  2117   2325   
  2118         -  if( paRec ){ *paRec = aChange; }
         2326  +  if( paRec ){ *paRec = &p->in.aChangeset[p->in.iNext]; }
  2119   2327   
  2120   2328     /* If this is an UPDATE or DELETE, read the old.* record. */
  2121   2329     if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
  2122   2330       u8 *abPK = p->bPatchset ? p->abPK : 0;
  2123         -    p->rc = sessionReadRecord(&aChange, p->nCol, abPK, paRec?0:p->apValue);
         2331  +    p->rc = sessionReadRecord(&p->in, p->nCol, abPK, paRec?0:p->apValue);
  2124   2332       if( p->rc!=SQLITE_OK ) return p->rc;
  2125   2333     }
  2126   2334   
  2127   2335     /* If this is an INSERT or UPDATE, read the new.* record. */
  2128   2336     if( p->op!=SQLITE_DELETE ){
  2129   2337       sqlite3_value **apOut = (paRec ? 0 : &p->apValue[p->nCol]);
  2130         -    p->rc = sessionReadRecord(&aChange, p->nCol, 0, apOut);
         2338  +    p->rc = sessionReadRecord(&p->in, p->nCol, 0, apOut);
  2131   2339       if( p->rc!=SQLITE_OK ) return p->rc;
  2132   2340     }
  2133   2341   
  2134         -  if( pnRec ){ 
  2135         -    *pnRec = (int)(aChange - *paRec); 
         2342  +  if( pnRec ){
         2343  +    *pnRec = (int)(&p->in.aChangeset[p->in.iNext] - *paRec);
  2136   2344     }else if( p->bPatchset && p->op==SQLITE_UPDATE ){
  2137   2345       /* If this is an UPDATE that is part of a patchset, then all PK and
  2138   2346       ** modified fields are present in the new.* record. The old.* record
  2139   2347       ** is currently completely empty. This block shifts the PK fields from
  2140   2348       ** new.* to old.*, to accommodate the code that reads these arrays.  */
  2141   2349       int i;
  2142   2350       for(i=0; i<p->nCol; i++){
................................................................................
  2144   2352         assert( p->abPK[i]==0 || p->apValue[i+p->nCol] );
  2145   2353         if( p->abPK[i] ){
  2146   2354           p->apValue[i] = p->apValue[i+p->nCol];
  2147   2355           p->apValue[i+p->nCol] = 0;
  2148   2356         }
  2149   2357       }
  2150   2358     }
  2151         -  p->pNext = aChange;
         2359  +
  2152   2360     return SQLITE_ROW;
  2153   2361   }
  2154   2362   
  2155   2363   /*
  2156   2364   ** Advance an iterator created by sqlite3changeset_start() to the next
  2157   2365   ** change in the changeset. This function may return SQLITE_ROW, SQLITE_DONE
  2158   2366   ** or SQLITE_CORRUPT.
................................................................................
  2317   2525   */
  2318   2526   int sqlite3changeset_finalize(sqlite3_changeset_iter *p){
  2319   2527     int i;                          /* Used to iterate through p->apValue[] */
  2320   2528     int rc = p->rc;                 /* Return code */
  2321   2529     if( p->apValue ){
  2322   2530       for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]);
  2323   2531     }
  2324         -  sqlite3_free(p->apValue);
         2532  +  sqlite3_free(p->tblhdr.aBuf);
  2325   2533     sqlite3_free(p);
  2326   2534     return rc;
  2327   2535   }
  2328   2536   
  2329   2537   /*
  2330   2538   ** Invert a changeset object.
  2331   2539   */
................................................................................
  2335   2543     int *pnInverted,                /* OUT: Number of bytes in output changeset */
  2336   2544     void **ppInverted               /* OUT: Inverse of pChangeset */
  2337   2545   ){
  2338   2546     int rc = SQLITE_OK;             /* Return value */
  2339   2547     u8 *aOut;
  2340   2548     u8 *aIn;
  2341   2549     int i;
         2550  +  SessionInput sInput;
  2342   2551     int nCol = 0;                   /* Number of cols in current table */
  2343   2552     u8 *abPK = 0;                   /* PK array for current table */
  2344   2553     sqlite3_value **apVal = 0;      /* Space for values for UPDATE inversion */
         2554  +  SessionBuffer sPK = {0, 0, 0};  /* PK array for current table */
  2345   2555   
  2346   2556     /* Zero the output variables in case an error occurs. */
  2347   2557     *ppInverted = 0;
  2348   2558     *pnInverted = 0;
  2349   2559     if( nChangeset==0 ) return SQLITE_OK;
  2350   2560   
         2561  +  /* Set up the input stream */
         2562  +  memset(&sInput, 0, sizeof(SessionInput));
         2563  +  sInput.nChangeset = nChangeset;
         2564  +  sInput.aChangeset = (u8*)pChangeset;
         2565  +
  2351   2566     aOut = (u8 *)sqlite3_malloc(nChangeset);
  2352   2567     if( !aOut ) return SQLITE_NOMEM;
  2353   2568     aIn = (u8 *)pChangeset;
  2354   2569   
  2355   2570     i = 0;
  2356   2571     while( i<nChangeset ){
  2357         -    u8 eType = aIn[i];
         2572  +    u8 eType;
         2573  +    if( (rc = sessionInputBuffer(&sInput, 2)) ) goto finished_invert;
         2574  +    eType = sInput.aChangeset[sInput.iNext];
  2358   2575       switch( eType ){
  2359   2576         case 'T': {
  2360   2577           /* A 'table' record consists of:
  2361   2578           **
  2362   2579           **   * A constant 'T' character,
  2363   2580           **   * Number of columns in said table (a varint),
  2364         -        **   * An array of nCol bytes (abPK),
         2581  +        **   * An array of nCol bytes (sPK),
  2365   2582           **   * A nul-terminated table name.
  2366   2583           */
  2367         -        int nByte = 1 + sessionVarintGet(&aIn[i+1], &nCol);
  2368         -        abPK = &aIn[i+nByte];
  2369         -        nByte += nCol;
  2370         -        nByte += 1 + sqlite3Strlen30((char *)&aIn[i+nByte]);
  2371         -        memcpy(&aOut[i], &aIn[i], nByte);
  2372         -        i += nByte;
         2584  +        int nByte;
         2585  +        int nVarint;
         2586  +        int iNext = sInput.iNext;
         2587  +        sInput.iNext++;
         2588  +        if( (rc = sessionChangesetBufferTblhdr(&sInput, &nByte)) ){
         2589  +          goto finished_invert;
         2590  +        }
         2591  +        nVarint = sessionVarintGet(&sInput.aChangeset[iNext+1], &nCol);
         2592  +        sPK.nBuf = 0;
         2593  +        sessionAppendBlob(&sPK, &sInput.aChangeset[iNext+1+nVarint], nCol, &rc);
         2594  +        if( rc ) goto finished_invert;
         2595  +        sInput.iNext += nByte;
         2596  +        memcpy(&aOut[i], &sInput.aChangeset[iNext], nByte+1);
         2597  +        i += nByte+1;
  2373   2598           sqlite3_free(apVal);
  2374   2599           apVal = 0;
         2600  +        abPK = sPK.aBuf;
  2375   2601           break;
  2376   2602         }
  2377   2603   
  2378   2604         case SQLITE_INSERT:
  2379   2605         case SQLITE_DELETE: {
         2606  +        int iStart;
  2380   2607           int nByte;
  2381         -        u8 *aEnd = &aIn[i+2];
  2382         -
  2383         -        sessionReadRecord(&aEnd, nCol, 0, 0);
         2608  +        sInput.iNext += 2;
         2609  +        iStart = sInput.iNext;
         2610  +        sessionReadRecord(&sInput, nCol, 0, 0);
  2384   2611           aOut[i] = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE);
  2385         -        aOut[i+1] = aIn[i+1];
  2386         -        nByte = (int)(aEnd - &aIn[i+2]);
  2387         -        memcpy(&aOut[i+2], &aIn[i+2], nByte);
         2612  +        aOut[i+1] = aIn[i+1];               /* indirect-flag */
         2613  +        nByte = sInput.iNext - iStart;
         2614  +        memcpy(&aOut[i+2], &sInput.aChangeset[iStart], nByte);
  2388   2615           i += 2 + nByte;
  2389   2616           break;
  2390   2617         }
  2391   2618   
  2392   2619         case SQLITE_UPDATE: {
  2393   2620           int iCol;
  2394   2621           int nWrite = 0;
  2395         -        u8 *aEnd = &aIn[i+2];
  2396   2622   
  2397   2623           if( 0==apVal ){
  2398   2624             apVal = (sqlite3_value **)sqlite3_malloc(sizeof(apVal[0])*nCol*2);
  2399   2625             if( 0==apVal ){
  2400   2626               rc = SQLITE_NOMEM;
  2401   2627               goto finished_invert;
  2402   2628             }
  2403   2629             memset(apVal, 0, sizeof(apVal[0])*nCol*2);
  2404   2630           }
  2405   2631   
  2406         -        /* Read the old.* and new.* records for the update change. */
  2407         -        rc = sessionReadRecord(&aEnd, nCol, 0, &apVal[0]);
  2408         -        if( rc==SQLITE_OK ){
  2409         -          rc = sessionReadRecord(&aEnd, nCol, 0, &apVal[nCol]);
  2410         -        }
  2411         -
  2412   2632           /* Write the header for the new UPDATE change. Same as the original. */
  2413   2633           aOut[i] = SQLITE_UPDATE;
  2414         -        aOut[i+1] = aIn[i+1];
         2634  +        aOut[i+1] = sInput.aChangeset[sInput.iNext+1];
  2415   2635           nWrite = 2;
         2636  +
         2637  +        /* Read the old.* and new.* records for the update change. */
         2638  +        sInput.iNext += 2;
         2639  +        rc = sessionReadRecord(&sInput, nCol, 0, &apVal[0]);
         2640  +        if( rc==SQLITE_OK ){
         2641  +          rc = sessionReadRecord(&sInput, nCol, 0, &apVal[nCol]);
         2642  +        }
  2416   2643   
  2417   2644           /* Write the new old.* record. Consists of the PK columns from the
  2418   2645           ** original old.* record, and the other values from the original
  2419   2646           ** new.* record. */
  2420   2647           for(iCol=0; rc==SQLITE_OK && iCol<nCol; iCol++){
  2421   2648             sqlite3_value *pVal = apVal[iCol + (abPK[iCol] ? 0 : nCol)];
  2422   2649             rc = sessionSerializeValue(&aOut[i+nWrite], pVal, &nWrite);
................................................................................
  2435   2662           }
  2436   2663           memset(apVal, 0, sizeof(apVal[0])*nCol*2);
  2437   2664           if( rc!=SQLITE_OK ){
  2438   2665             goto finished_invert;
  2439   2666           }
  2440   2667   
  2441   2668           i += nWrite;
  2442         -        assert( &aIn[i]==aEnd );
         2669  +        assert( i==sInput.iNext );
  2443   2670           break;
  2444   2671         }
  2445   2672   
  2446   2673         default:
  2447   2674           rc = SQLITE_CORRUPT;
  2448   2675           goto finished_invert;
  2449   2676       }
................................................................................
  2454   2681     *ppInverted = (void *)aOut;
  2455   2682   
  2456   2683    finished_invert:
  2457   2684     if( rc!=SQLITE_OK ){
  2458   2685       sqlite3_free(aOut);
  2459   2686     }
  2460   2687     sqlite3_free(apVal);
         2688  +  sqlite3_free(sPK.aBuf);
  2461   2689     return rc;
  2462   2690   }
  2463   2691   
  2464   2692   typedef struct SessionApplyCtx SessionApplyCtx;
  2465   2693   struct SessionApplyCtx {
  2466   2694     sqlite3 *db;
  2467   2695     sqlite3_stmt *pDelete;          /* DELETE statement */
................................................................................
  3272   3500         pNew->bIndirect = (bIndirect && pExist->bIndirect);
  3273   3501         aCsr = pNew->aRecord = (u8 *)&pNew[1];
  3274   3502   
  3275   3503         if( op1==SQLITE_INSERT ){             /* INSERT + UPDATE */
  3276   3504           u8 *a1 = aRec;
  3277   3505           assert( op2==SQLITE_UPDATE );
  3278   3506           pNew->op = SQLITE_INSERT;
  3279         -        if( bPatchset==0 ) sessionReadRecord(&a1, pTab->nCol, 0, 0);
         3507  +        if( bPatchset==0 ) sessionSkipRecord(&a1, pTab->nCol);
  3280   3508           sessionMergeRecord(&aCsr, pTab->nCol, aExist, a1);
  3281   3509         }else if( op1==SQLITE_DELETE ){       /* DELETE + INSERT */
  3282   3510           assert( op2==SQLITE_INSERT );
  3283   3511           pNew->op = SQLITE_UPDATE;
  3284   3512           if( 0==sessionMergeUpdate(&aCsr, pTab, bPatchset, aExist, 0, aRec, 0) ){
  3285   3513             sqlite3_free(pNew);
  3286   3514             pNew = 0;
  3287   3515           }
  3288   3516         }else if( op2==SQLITE_UPDATE ){       /* UPDATE + UPDATE */
  3289   3517           u8 *a1 = aExist;
  3290   3518           u8 *a2 = aRec;
  3291   3519           assert( op1==SQLITE_UPDATE );
  3292   3520           if( bPatchset==0 ){
  3293         -          sessionReadRecord(&a1, pTab->nCol, 0, 0);
  3294         -          sessionReadRecord(&a2, pTab->nCol, 0, 0);
         3521  +          sessionSkipRecord(&a1, pTab->nCol);
         3522  +          sessionSkipRecord(&a2, pTab->nCol);
  3295   3523           }
  3296   3524           pNew->op = SQLITE_UPDATE;
  3297   3525           if( 0==sessionMergeUpdate(&aCsr, pTab, bPatchset, aRec, aExist,a1,a2) ){
  3298   3526             sqlite3_free(pNew);
  3299   3527             pNew = 0;
  3300   3528           }
  3301   3529         }else{                                /* UPDATE + DELETE */
................................................................................
  3352   3580       assert( bPatchset==0 || bPatchset==1 );
  3353   3581       assert( pIter->bPatchset==0 || pIter->bPatchset==1 );
  3354   3582       if( pIter->bPatchset!=bPatchset ){
  3355   3583         rc = SQLITE_ERROR;
  3356   3584         break;
  3357   3585       }
  3358   3586   
  3359         -    assert( pIter->apValue==0 );
  3360   3587       sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect);
  3361         -
  3362         -    assert( zNew>=(char *)pChangeset && zNew-nChangeset<((char *)pChangeset) );
  3363         -    assert( !pTab || pTab->zName-nChangeset<(char *)pChangeset );
  3364         -    assert( !pTab || zNew>=pTab->zName );
  3365         -
  3366         -    if( !pTab || zNew!=pTab->zName ){
         3588  +    if( !pTab || sqlite3_stricmp(zNew, pTab->zName) ){
  3367   3589         /* Search the list for a matching table */
  3368   3590         int nNew = (int)strlen(zNew);
  3369   3591         u8 *abPK;
  3370   3592   
  3371   3593         sqlite3changeset_pk(pIter, &abPK, 0);
  3372   3594         for(pTab = *ppTabList; pTab; pTab=pTab->pNext){
  3373   3595           if( 0==sqlite3_strnicmp(pTab->zName, zNew, nNew+1) ) break;
  3374   3596         }
  3375   3597         if( !pTab ){
  3376         -        pTab = sqlite3_malloc(sizeof(SessionTable));
         3598  +        pTab = sqlite3_malloc(sizeof(SessionTable) + nCol + nNew+1);
  3377   3599           if( !pTab ){
  3378   3600             rc = SQLITE_NOMEM;
  3379   3601             break;
  3380   3602           }
  3381   3603           memset(pTab, 0, sizeof(SessionTable));
  3382   3604           pTab->pNext = *ppTabList;
  3383         -        pTab->abPK = abPK;
  3384   3605           pTab->nCol = nCol;
         3606  +        pTab->abPK = (u8*)&pTab[1];
         3607  +        memcpy(pTab->abPK, abPK, nCol);
         3608  +        pTab->zName = (char*)&pTab->abPK[nCol];
         3609  +        memcpy(pTab->zName, zNew, nNew+1);
  3385   3610           *ppTabList = pTab;
  3386   3611         }else if( pTab->nCol!=nCol || memcmp(pTab->abPK, abPK, nCol) ){
  3387   3612           rc = SQLITE_SCHEMA;
  3388   3613           break;
  3389   3614         }
  3390         -      pTab->zName = (char *)zNew;
  3391   3615       }
  3392   3616   
  3393   3617       if( sessionGrowHash(bPatchset, pTab) ){
  3394   3618         rc = SQLITE_NOMEM;
  3395   3619         break;
  3396   3620       }
  3397   3621       iHash = sessionChangeHash(

Changes to ext/session/sqlite3session.h.

   268    268   ** resulting changeset will contain an UPDATE change that updates both fields.
   269    269   */
   270    270   int sqlite3session_changeset(
   271    271     sqlite3_session *pSession,      /* Session object */
   272    272     int *pnChangeset,               /* OUT: Size of buffer at *ppChangeset */
   273    273     void **ppChangeset              /* OUT: Buffer containing changeset */
   274    274   );
          275  +
          276  +
          277  +/*
          278  +** This function is similar to sqlite3session_changeset(), except that instead
          279  +** of storing the output changeset in a buffer obtained from sqlite3_malloc()
          280  +** it invokes the supplied xOutput() callback zero or more times to stream the
          281  +** changeset to the application. This is useful in order to avoid large memory
          282  +** allocations when working with very large changesets.
          283  +**
          284  +** The first parameter passed to each call to the xOutput callback is a copy
          285  +** of the pOut parameter passed to this function. The following two parameters
          286  +** are a pointer to the buffer containing the next chunk of the output changeset
          287  +** and the size of that buffer in bytes.
          288  +**
          289  +** If the data is successfully processed by the xOutput callback, it should
          290  +** return SQLITE_OK. Or, if an error occurs, some other SQLite error code. In
          291  +** this case the sqlite3session_changeset_str() call is abandoned immediately
          292  +** and returns a copy of the xOutput return code.
          293  +*/
          294  +int sqlite3session_changeset_str(
          295  +  sqlite3_session *pSession,
          296  +  int (*xOutput)(void *pOut, const void *pData, int nData),
          297  +  void *pOut
          298  +);
          299  +
   275    300   
   276    301   /*
   277    302   ** CAPI3REF: Generate A Patchset From A Session Object
   278    303   **
   279    304   ** The differences between a patchset and a changeset are that:
   280    305   **
   281    306   ** <ul>
................................................................................
   297    322   ** in the same way as for changesets.
   298    323   */
   299    324   int sqlite3session_patchset(
   300    325     sqlite3_session *pSession,      /* Session object */
   301    326     int *pnPatchset,                /* OUT: Size of buffer at *ppChangeset */
   302    327     void **ppPatchset               /* OUT: Buffer containing changeset */
   303    328   );
          329  +
          330  +/*
          331  +** Streaming version of sqlite3session_patchset().
          332  +*/
          333  +int sqlite3session_patchset_str(
          334  +  sqlite3_session *pSession,
          335  +  int (*xOutput)(void *pOut, const void *pData, int nData),
          336  +  void *pOut
          337  +);
   304    338   
   305    339   /*
   306    340   ** CAPI3REF: Test if a changeset has recorded any changes.
   307    341   **
   308    342   ** Return non-zero if no changes to attached tables have been recorded by 
   309    343   ** the session object passed as the first argument. Otherwise, if one or 
   310    344   ** more changes have been recorded, return zero.
................................................................................
   354    388   */
   355    389   int sqlite3changeset_start(
   356    390     sqlite3_changeset_iter **pp,    /* OUT: New changeset iterator handle */
   357    391     int nChangeset,                 /* Size of changeset blob in bytes */
   358    392     void *pChangeset                /* Pointer to blob containing changeset */
   359    393   );
   360    394   
          395  +
          396  +/*
          397  +** This function is similar to sqlite3changeset_start(), except that instead
          398  +** of reading data from a single buffer, it requests it one chunk at a time
          399  +** from the application by invoking the supplied xInput() callback. The xInput()
          400  +** callback may be invoked at any time during the lifetime of the iterator.
          401  +**
          402  +** Each time the xInput callback is invoked, the first argument passed is a
          403  +** copy of the third parameter passed to this function. The second argument,
          404  +** pData, points to a buffer (*pnData) bytes in size. Assuming no error occurs
          405  +** the xInput method should copy up to (*pnData) bytes of data into the buffer
          406  +** and set (*pnData) to the actual number of bytes copied before returning
          407  +** SQLITE_OK. If the input is completely exhausted, (*pnData) should be set
          408  +** to zero to indicate this. Or, if an error occurs, an SQLite error code
          409  +** should be returned. In this case the iterator is put into an error state and
          410  +** all subsequent calls to iterator methods return a copy of the xInput error
          411  +** code.
          412  +*/
          413  +int sqlite3changeset_start_str(
          414  +  sqlite3_changeset_iter **pp,
          415  +  int (*xInput)(void *pIn, void *pData, int *pnData),
          416  +  void *pIn
          417  +);
          418  +
   361    419   /*
   362    420   ** CAPI3REF: Advance A Changeset Iterator
   363    421   **
   364    422   ** This function may only be used with iterators created by function
   365    423   ** [sqlite3changeset_start()]. If it is called on an iterator passed to
   366    424   ** a conflict-handler callback by [sqlite3changeset_apply()], SQLITE_MISUSE
   367    425   ** is returned and the call has no effect.

Changes to ext/session/test_session.c.

     9      9   
    10     10   typedef struct TestSession TestSession;
    11     11   struct TestSession {
    12     12     sqlite3_session *pSession;
    13     13     Tcl_Interp *interp;
    14     14     Tcl_Obj *pFilterScript;
    15     15   };
           16  +
           17  +#define SESSION_STREAM_TCL_VAR "sqlite3session_streams"
           18  +
           19  +/*
           20  +** Attempt to find the global variable zVar within interpreter interp
           21  +** and extract a boolean value from it. Return this value.
           22  +**
           23  +** If the named variable cannot be found, or if it cannot be interpreted
           24  +** as a boolean, return 0.
           25  +*/
           26  +static int test_tcl_boolean(Tcl_Interp *interp, const char *zVar){
           27  +  Tcl_Obj *pObj;
           28  +  int bVal = 0;
           29  +  pObj = Tcl_ObjGetVar2(interp, Tcl_NewStringObj(zVar, -1), 0, TCL_GLOBAL_ONLY);
           30  +  if( pObj ) Tcl_GetBooleanFromObj(0, pObj, &bVal);
           31  +  return bVal;
           32  +}
    16     33   
    17     34   static int test_session_error(Tcl_Interp *interp, int rc){
    18     35     extern const char *sqlite3ErrName(int);
    19     36     Tcl_SetObjResult(interp, Tcl_NewStringObj(sqlite3ErrName(rc), -1));
    20     37     return TCL_ERROR;
    21     38   }
    22     39   
................................................................................
    39     56       /* printf("error: %s\n", Tcl_GetStringResult(p->interp)); */
    40     57       Tcl_BackgroundError(p->interp);
    41     58     }
    42     59     Tcl_DecrRefCount(pEval);
    43     60   
    44     61     return bRes;
    45     62   }
           63  +
           64  +struct TestSessionsBlob {
           65  +  void *p;
           66  +  int n;
           67  +};
           68  +typedef struct TestSessionsBlob TestSessionsBlob;
           69  +
           70  +static int testSessionsOutput(
           71  +  void *pCtx,
           72  +  const void *pData,
           73  +  int nData
           74  +){
           75  +  TestSessionsBlob *pBlob = (TestSessionsBlob*)pCtx;
           76  +  char *pNew;
           77  +
           78  +  assert( nData>0 );
           79  +  pNew = (char*)sqlite3_realloc(pBlob->p, pBlob->n + nData);
           80  +  if( pNew==0 ){
           81  +    return SQLITE_NOMEM;
           82  +  }
           83  +  pBlob->p = (void*)pNew;
           84  +  memcpy(&pNew[pBlob->n], pData, nData);
           85  +  pBlob->n += nData;
           86  +  return SQLITE_OK;
           87  +}
    46     88   
    47     89   /*
    48     90   ** Tclcmd:  $session attach TABLE
    49     91   **          $session changeset
    50     92   **          $session delete
    51     93   **          $session enable BOOL
    52     94   **          $session indirect INTEGER
           95  +**          $session patchset
    53     96   **          $session table_filter SCRIPT
    54     97   */
    55     98   static int test_session_cmd(
    56     99     void *clientData,
    57    100     Tcl_Interp *interp,
    58    101     int objc,
    59    102     Tcl_Obj *CONST objv[]
................................................................................
   101    144           return test_session_error(interp, rc);
   102    145         }
   103    146         break;
   104    147       }
   105    148   
   106    149       case 7:        /* patchset */
   107    150       case 1: {      /* changeset */
   108         -      int nChange;
   109         -      void *pChange;
   110         -      if( iSub==7 ){
   111         -        rc = sqlite3session_patchset(pSession, &nChange, &pChange);
          151  +      TestSessionsBlob o = {0, 0};
          152  +      if( test_tcl_boolean(interp, SESSION_STREAM_TCL_VAR) ){
          153  +        void *pCtx = (void*)&o;
          154  +        if( iSub==7 ){
          155  +          rc = sqlite3session_patchset_str(pSession, testSessionsOutput, pCtx);
          156  +        }else{
          157  +          rc = sqlite3session_changeset_str(pSession, testSessionsOutput, pCtx);
          158  +        }
   112    159         }else{
   113         -        rc = sqlite3session_changeset(pSession, &nChange, &pChange);
          160  +        if( iSub==7 ){
          161  +          rc = sqlite3session_patchset(pSession, &o.n, &o.p);
          162  +        }else{
          163  +          rc = sqlite3session_changeset(pSession, &o.n, &o.p);
          164  +        }
   114    165         }
   115    166         if( rc==SQLITE_OK ){
   116         -        Tcl_SetObjResult(interp, Tcl_NewByteArrayObj(pChange, nChange)); 
   117         -        sqlite3_free(pChange);
   118         -      }else{
          167  +        Tcl_SetObjResult(interp, Tcl_NewByteArrayObj(o.p, o.n)); 
          168  +      }
          169  +      sqlite3_free(o.p);
          170  +      if( rc!=SQLITE_OK ){
   119    171           return test_session_error(interp, rc);
   120    172         }
   121    173         break;
   122    174       }
   123    175   
   124    176       case 2:        /* delete */
   125    177         Tcl_DeleteCommand(interp, Tcl_GetString(objv[0]));

Changes to test/permutations.test.

   933    933   test_suite "session_eec" -description {
   934    934     All session module related tests with sqlite3_extended_result_codes() set. 
   935    935   } -files [
   936    936     glob -nocomplain $::testdir/../ext/session/*.test
   937    937   ] -dbconfig {
   938    938     sqlite3_extended_result_codes $::dbhandle 1
   939    939   }
          940  +
          941  +test_suite "session_str" -description {
          942  +  All session module related tests using the streaming APIs.
          943  +} -files [
          944  +  glob -nocomplain $::testdir/../ext/session/*.test
          945  +] -dbconfig {
          946  +  set ::sqlite3session_streams 1
          947  +}
   940    948   
   941    949   test_suite "no_optimization" -description {
   942    950     Run test scripts with optimizations disabled using the
   943    951     sqlite3_test_control(SQLITE_TESTCTRL_OPTIMIZATIONS) interface.
   944    952   } -files {
   945    953     where.test where2.test where3.test where4.test where5.test
   946    954     where6.test where7.test where8.test where9.test