Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Add streaming version of sqlite3changeset_apply(). Tests and fixes for the same and sqlite3changeset_start_str(). |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | sessions |
Files: | files | file ages | folders |
SHA1: |
b917fc146876f764442de08d5ec36e5b |
User & Date: | dan 2014-09-24 17:13:20.331 |
Context
2014-09-25
| ||
14:54 | Add streaming version of sqlite3changeset_invert() to sessions module. (check-in: 8ded6a4679 user: dan tags: sessions) | |
2014-09-24
| ||
17:13 | Add streaming version of sqlite3changeset_apply(). Tests and fixes for the same and sqlite3changeset_start_str(). (check-in: b917fc1468 user: dan tags: sessions) | |
2014-09-23
| ||
20:39 | Begin adding 'streaming' APIs to sessions module. This is a work in progress. (check-in: 3c7d3d950b user: dan tags: sessions) | |
Changes
Changes to ext/session/sqlite3session.c.
︙ | ︙ | |||
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | typedef struct SessionChange SessionChange; typedef struct SessionBuffer SessionBuffer; typedef struct SessionInput SessionInput; /* ** Minimum chunk size used by streaming versions of functions. */ #define SESSIONS_STR_CHUNK_SIZE 1024 /* ** Session handle structure. */ struct sqlite3_session { sqlite3 *db; /* Database handle session is attached to */ char *zDb; /* Name of database session is attached to */ | > > > > | 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | typedef struct SessionChange SessionChange; typedef struct SessionBuffer SessionBuffer; typedef struct SessionInput SessionInput; /* ** Minimum chunk size used by streaming versions of functions. */ #ifdef SQLITE_TEST #define SESSIONS_STR_CHUNK_SIZE 1 #else #define SESSIONS_STR_CHUNK_SIZE 1024 #endif /* ** Session handle structure. */ struct sqlite3_session { sqlite3 *db; /* Database handle session is attached to */ char *zDb; /* Name of database session is attached to */ |
︙ | ︙ | |||
47 48 49 50 51 52 53 | /* ** An object of this type is used internally as an abstraction for the ** input data read by changeset iterators. Input data may be supplied ** either as a single large buffer (sqlite3changeset_start()) or using ** a stream function (sqlite3changeset_start_str()). */ struct SessionInput { | | | | > | 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | /* ** An object of this type is used internally as an abstraction for the ** input data read by changeset iterators. Input data may be supplied ** either as a single large buffer (sqlite3changeset_start()) or using ** a stream function (sqlite3changeset_start_str()). */ struct SessionInput { int iNext; /* Offset in aData[] of next change */ u8 *aData; /* Pointer to buffer containing changeset */ int nData; /* Number of bytes in aData */ SessionBuffer buf; /* Current read buffer */ int (*xInput)(void*, void*, int*); /* Input stream call (or NULL) */ void *pIn; /* First argument to xInput */ int bEof; /* Set to true after xInput finished */ }; /* |
︙ | ︙ | |||
2029 2030 2031 2032 2033 2034 2035 | *pp = 0; /* Allocate and initialize the iterator structure. */ nByte = sizeof(sqlite3_changeset_iter); pRet = (sqlite3_changeset_iter *)sqlite3_malloc(nByte); if( !pRet ) return SQLITE_NOMEM; memset(pRet, 0, sizeof(sqlite3_changeset_iter)); | | | | 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 | *pp = 0; /* Allocate and initialize the iterator structure. */ nByte = sizeof(sqlite3_changeset_iter); pRet = (sqlite3_changeset_iter *)sqlite3_malloc(nByte); if( !pRet ) return SQLITE_NOMEM; memset(pRet, 0, sizeof(sqlite3_changeset_iter)); pRet->in.aData = (u8 *)pChangeset; pRet->in.nData = nChangeset; pRet->in.xInput = xInput; pRet->in.pIn = pIn; pRet->in.iNext = 0; pRet->in.bEof = (xInput ? 0 : 1); /* Populate the output variable and return success. */ *pp = pRet; |
︙ | ︙ | |||
2070 2071 2072 2073 2074 2075 2076 | /* ** Ensure that there are at least nByte bytes available in the buffer. Or, ** if there are not nByte bytes remaining in the input, that all available ** data is in the buffer. ** ** Return an SQLite error code if an error occurs, or SQLITE_OK otherwise. */ | | > > > | > > > > > > | > > > > > > > > > > > > | 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 | /* ** Ensure that there are at least nByte bytes available in the buffer. Or, ** if there are not nByte bytes remaining in the input, that all available ** data is in the buffer. ** ** Return an SQLite error code if an error occurs, or SQLITE_OK otherwise. */ static int sessionInputBuffer(SessionInput *pIn, int nByte){ int rc = SQLITE_OK; if( pIn->xInput ){ while( !pIn->bEof && (pIn->iNext+nByte)>=pIn->nData && rc==SQLITE_OK ){ int nNew = SESSIONS_STR_CHUNK_SIZE; if( pIn->iNext>=SESSIONS_STR_CHUNK_SIZE ){ int nMove = pIn->buf.nBuf - pIn->iNext; memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove); pIn->buf.nBuf -= pIn->iNext; pIn->iNext = 0; } if( SQLITE_OK==sessionBufferGrow(&pIn->buf, nNew, &rc) ){ rc = pIn->xInput(pIn->pIn, &pIn->buf.aBuf[pIn->buf.nBuf], &nNew); if( nNew==0 ){ pIn->bEof = 1; }else{ pIn->buf.nBuf += nNew; } } pIn->aData = pIn->buf.aBuf; pIn->nData = pIn->buf.nBuf; } } return rc; } /* ** When this function is called, *ppRec points to the start of a record ** that contains nCol values. This function advances the pointer *ppRec |
︙ | ︙ | |||
2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 | }else if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ aRec += 8; } } *ppRec = aRec; } /* ** Deserialize a single record from a buffer in memory. See "RECORD FORMAT" ** for details. ** ** When this function is called, *paChange points to the start of the record ** to deserialize. Assuming no error occurs, *paChange is set to point to | > > > > > > > > > > > > > > > > > > > | 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 | }else if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ aRec += 8; } } *ppRec = aRec; } /* ** This function sets the value of the sqlite3_value object passed as the ** first argument to a copy of the string or blob held in the aData[] ** buffer. SQLITE_OK is returned if successful, or SQLITE_NOMEM if an OOM ** error occurs. */ static int sessionValueSetStr( sqlite3_value *pVal, /* Set the value of this object */ u8 *aData, /* Buffer containing string or blob data */ int nData, /* Size of buffer aData[] in bytes */ u8 enc /* String encoding (0 for blobs) */ ){ u8 *aCopy = sqlite3_malloc(nData); if( aCopy==0 ) return SQLITE_NOMEM; memcpy(aCopy, aData, nData); sqlite3ValueSetStr(pVal, nData, (char*)aCopy, enc, sqlite3_free); return SQLITE_OK; } /* ** Deserialize a single record from a buffer in memory. See "RECORD FORMAT" ** for details. ** ** When this function is called, *paChange points to the start of the record ** to deserialize. Assuming no error occurs, *paChange is set to point to |
︙ | ︙ | |||
2141 2142 2143 2144 2145 2146 2147 | int rc = SQLITE_OK; for(i=0; i<nCol && rc==SQLITE_OK; i++){ int eType = 0; /* Type of value (SQLITE_NULL, TEXT etc.) */ if( abPK && abPK[i]==0 ) continue; rc = sessionInputBuffer(pIn, 9); if( rc==SQLITE_OK ){ | | | < | | 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 | int rc = SQLITE_OK; for(i=0; i<nCol && rc==SQLITE_OK; i++){ int eType = 0; /* Type of value (SQLITE_NULL, TEXT etc.) */ if( abPK && abPK[i]==0 ) continue; rc = sessionInputBuffer(pIn, 9); if( rc==SQLITE_OK ){ eType = pIn->aData[pIn->iNext++]; } assert( !apOut || apOut[i]==0 ); if( eType ){ if( apOut ){ apOut[i] = sqlite3ValueNew(0); if( !apOut[i] ) rc = SQLITE_NOMEM; } } if( rc==SQLITE_OK ){ u8 *aVal = &pIn->aData[pIn->iNext]; if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){ int nByte; pIn->iNext += sessionVarintGet(aVal, &nByte); rc = sessionInputBuffer(pIn, nByte); if( apOut && rc==SQLITE_OK ){ u8 enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0); rc = sessionValueSetStr(apOut[i],&pIn->aData[pIn->iNext],nByte,enc); } pIn->iNext += nByte; } if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ if( apOut ){ sqlite3_int64 v = sessionGetI64(aVal); if( eType==SQLITE_INTEGER ){ |
︙ | ︙ | |||
2200 2201 2202 2203 2204 2205 2206 | ** buffer (i.e. that it can be accessed without any calls to xInput()). ** If successful, SQLITE_OK is returned. Otherwise, an SQLite error code. ** The input pointer is not moved. */ static int sessionChangesetBufferTblhdr(SessionInput *pIn, int *pnByte){ int rc = SQLITE_OK; int nCol = 0; | | | | | > | > > | | | | | | 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 | ** buffer (i.e. that it can be accessed without any calls to xInput()). ** If successful, SQLITE_OK is returned. Otherwise, an SQLite error code. ** The input pointer is not moved. */ static int sessionChangesetBufferTblhdr(SessionInput *pIn, int *pnByte){ int rc = SQLITE_OK; int nCol = 0; int nRead = 0; rc = sessionInputBuffer(pIn, 9); if( rc==SQLITE_OK ){ nRead += sessionVarintGet(&pIn->aData[pIn->iNext + nRead], &nCol); rc = sessionInputBuffer(pIn, nRead+nCol+100); nRead += nCol; } while( rc==SQLITE_OK ){ while( (pIn->iNext + nRead)<pIn->nData && pIn->aData[pIn->iNext + nRead] ){ nRead++; } if( pIn->aData[pIn->iNext + nRead]==0 ) break; rc = sessionInputBuffer(pIn, nRead + 100); } if( pnByte ) *pnByte = nRead+1; return rc; } /* ** The input pointer currently points to the second byte of a table-header. ** Specifically, to the following: ** ** + number of columns in table (varint) ** + array of PK flags (1 byte per column), ** + table name (nul terminated). */ static int sessionChangesetReadTblhdr(sqlite3_changeset_iter *p){ int rc; int nCopy; assert( p->rc==SQLITE_OK ); rc = sessionChangesetBufferTblhdr(&p->in, &nCopy); if( rc==SQLITE_OK ){ int nByte; int nVarint; nVarint = sessionVarintGet(&p->in.aData[p->in.iNext], &p->nCol); nCopy -= nVarint; p->in.iNext += nVarint; nByte = p->nCol * sizeof(sqlite3_value*) * 2 + nCopy; p->tblhdr.nBuf = 0; sessionBufferGrow(&p->tblhdr, nByte, &rc); } if( rc==SQLITE_OK ){ int iPK = sizeof(sqlite3_value*)*p->nCol*2; memset(p->tblhdr.aBuf, 0, iPK); memcpy(&p->tblhdr.aBuf[iPK], &p->in.aData[p->in.iNext], nCopy); p->in.iNext += nCopy; } p->apValue = (sqlite3_value**)p->tblhdr.aBuf; p->abPK = (u8*)&p->apValue[p->nCol*2]; p->zTab = (char*)&p->abPK[p->nCol]; return (p->rc = rc); |
︙ | ︙ | |||
2301 2302 2303 2304 2305 2306 2307 | ** remaining data if there are less than 10 bytes available. This is ** sufficient either for the 'T' or 'P' byte and the varint that follows ** it, or for the two single byte values otherwise. */ p->rc = sessionInputBuffer(&p->in, 2); if( p->rc!=SQLITE_OK ) return p->rc; /* If the iterator is already at the end of the changeset, return DONE. */ | | | | | | | | | 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 | ** remaining data if there are less than 10 bytes available. This is ** sufficient either for the 'T' or 'P' byte and the varint that follows ** it, or for the two single byte values otherwise. */ p->rc = sessionInputBuffer(&p->in, 2); if( p->rc!=SQLITE_OK ) return p->rc; /* If the iterator is already at the end of the changeset, return DONE. */ if( p->in.iNext>=p->in.nData ){ return SQLITE_DONE; } op = p->in.aData[p->in.iNext++]; if( op=='T' || op=='P' ){ p->bPatchset = (op=='P'); if( sessionChangesetReadTblhdr(p) ) return p->rc; if( (p->rc = sessionInputBuffer(&p->in, 2)) ) return p->rc; op = p->in.aData[p->in.iNext++]; } p->op = op; p->bIndirect = p->in.aData[p->in.iNext++]; if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){ return (p->rc = SQLITE_CORRUPT_BKPT); } if( paRec ){ *paRec = &p->in.aData[p->in.iNext]; } /* If this is an UPDATE or DELETE, read the old.* record. */ if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){ u8 *abPK = p->bPatchset ? p->abPK : 0; p->rc = sessionReadRecord(&p->in, p->nCol, abPK, paRec?0:p->apValue); if( p->rc!=SQLITE_OK ) return p->rc; } /* If this is an INSERT or UPDATE, read the new.* record. */ if( p->op!=SQLITE_DELETE ){ sqlite3_value **apOut = (paRec ? 0 : &p->apValue[p->nCol]); p->rc = sessionReadRecord(&p->in, p->nCol, 0, apOut); if( p->rc!=SQLITE_OK ) return p->rc; } if( pnRec ){ *pnRec = (int)(&p->in.aData[p->in.iNext] - *paRec); }else if( p->bPatchset && p->op==SQLITE_UPDATE ){ /* If this is an UPDATE that is part of a patchset, then all PK and ** modified fields are present in the new.* record. The old.* record ** is currently completely empty. This block shifts the PK fields from ** new.* to old.*, to accommodate the code that reads these arrays. */ int i; for(i=0; i<p->nCol; i++){ |
︙ | ︙ | |||
2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 | int sqlite3changeset_finalize(sqlite3_changeset_iter *p){ int i; /* Used to iterate through p->apValue[] */ int rc = p->rc; /* Return code */ if( p->apValue ){ for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]); } sqlite3_free(p->tblhdr.aBuf); sqlite3_free(p); return rc; } /* ** Invert a changeset object. */ | > | 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 | int sqlite3changeset_finalize(sqlite3_changeset_iter *p){ int i; /* Used to iterate through p->apValue[] */ int rc = p->rc; /* Return code */ if( p->apValue ){ for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]); } sqlite3_free(p->tblhdr.aBuf); sqlite3_free(p->in.buf.aBuf); sqlite3_free(p); return rc; } /* ** Invert a changeset object. */ |
︙ | ︙ | |||
2556 2557 2558 2559 2560 2561 2562 | /* Zero the output variables in case an error occurs. */ *ppInverted = 0; *pnInverted = 0; if( nChangeset==0 ) return SQLITE_OK; /* Set up the input stream */ memset(&sInput, 0, sizeof(SessionInput)); | | | | | | | | | | 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 | /* Zero the output variables in case an error occurs. */ *ppInverted = 0; *pnInverted = 0; if( nChangeset==0 ) return SQLITE_OK; /* Set up the input stream */ memset(&sInput, 0, sizeof(SessionInput)); sInput.nData = nChangeset; sInput.aData = (u8*)pChangeset; aOut = (u8 *)sqlite3_malloc(nChangeset); if( !aOut ) return SQLITE_NOMEM; aIn = (u8 *)pChangeset; i = 0; while( i<nChangeset ){ u8 eType; if( (rc = sessionInputBuffer(&sInput, 2)) ) goto finished_invert; eType = sInput.aData[sInput.iNext]; switch( eType ){ case 'T': { /* A 'table' record consists of: ** ** * A constant 'T' character, ** * Number of columns in said table (a varint), ** * An array of nCol bytes (sPK), ** * A nul-terminated table name. */ int nByte; int nVarint; int iNext = sInput.iNext; sInput.iNext++; if( (rc = sessionChangesetBufferTblhdr(&sInput, &nByte)) ){ goto finished_invert; } nVarint = sessionVarintGet(&sInput.aData[iNext+1], &nCol); sPK.nBuf = 0; sessionAppendBlob(&sPK, &sInput.aData[iNext+1+nVarint], nCol, &rc); if( rc ) goto finished_invert; sInput.iNext += nByte; memcpy(&aOut[i], &sInput.aData[iNext], nByte+1); i += nByte+1; sqlite3_free(apVal); apVal = 0; abPK = sPK.aBuf; break; } case SQLITE_INSERT: case SQLITE_DELETE: { int iStart; int nByte; sInput.iNext += 2; iStart = sInput.iNext; sessionReadRecord(&sInput, nCol, 0, 0); aOut[i] = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE); aOut[i+1] = aIn[i+1]; /* indirect-flag */ nByte = sInput.iNext - iStart; memcpy(&aOut[i+2], &sInput.aData[iStart], nByte); i += 2 + nByte; break; } case SQLITE_UPDATE: { int iCol; int nWrite = 0; if( 0==apVal ){ apVal = (sqlite3_value **)sqlite3_malloc(sizeof(apVal[0])*nCol*2); if( 0==apVal ){ rc = SQLITE_NOMEM; goto finished_invert; } memset(apVal, 0, sizeof(apVal[0])*nCol*2); } /* Write the header for the new UPDATE change. Same as the original. */ aOut[i] = SQLITE_UPDATE; aOut[i+1] = sInput.aData[sInput.iNext+1]; nWrite = 2; /* Read the old.* and new.* records for the update change. */ sInput.iNext += 2; rc = sessionReadRecord(&sInput, nCol, 0, &apVal[0]); if( rc==SQLITE_OK ){ rc = sessionReadRecord(&sInput, nCol, 0, &apVal[nCol]); |
︙ | ︙ | |||
2667 2668 2669 2670 2671 2672 2673 | i += nWrite; assert( i==sInput.iNext ); break; } default: | | | 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 | i += nWrite; assert( i==sInput.iNext ); break; } default: rc = SQLITE_CORRUPT_BKPT; goto finished_invert; } } assert( rc==SQLITE_OK ); *pnInverted = nChangeset; *ppInverted = (void *)aOut; |
︙ | ︙ | |||
3254 3255 3256 3257 3258 3259 3260 | } } return rc; } /* | > > | | | | < | < < < < | 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 | } } return rc; } /* ** Argument pIter is a changeset iterator that has been initialized, but ** not yet passed to sqlite3changeset_next(). This function applies the ** changeset to the main database attached to handle "db". The supplied ** conflict handler callback is invoked to resolve any conflicts encountered ** while applying the change. */ static int sessionChangesetApply( sqlite3 *db, /* Apply change to "main" db of this handle */ sqlite3_changeset_iter *pIter, /* Changeset to apply */ int(*xFilter)( void *pCtx, /* Copy of sixth arg to _apply() */ const char *zTab /* Table name */ ), int(*xConflict)( void *pCtx, /* Copy of fifth arg to _apply() */ int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ sqlite3_changeset_iter *p /* Handle describing change and conflict */ ), void *pCtx /* First argument passed to xConflict */ ){ int schemaMismatch = 0; int rc; /* Return code */ const char *zTab = 0; /* Name of current table */ int nTab = 0; /* Result of sqlite3Strlen30(zTab) */ SessionApplyCtx sApply; /* changeset_apply() context object */ assert( xConflict!=0 ); memset(&sApply, 0, sizeof(sApply)); sqlite3_mutex_enter(sqlite3_db_mutex(db)); rc = sqlite3_exec(db, "SAVEPOINT changeset_apply", 0, 0, 0); if( rc==SQLITE_OK ){ rc = sqlite3_exec(db, "PRAGMA defer_foreign_keys = 1", 0, 0, 0); } while( rc==SQLITE_OK && SQLITE_ROW==sqlite3changeset_next(pIter) ){ int nCol; |
︙ | ︙ | |||
3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 | sqlite3_finalize(sApply.pDelete); sqlite3_finalize(sApply.pUpdate); sqlite3_finalize(sApply.pSelect); sqlite3_free((char*)sApply.azCol); /* cast works around VC++ bug */ sqlite3_mutex_leave(sqlite3_db_mutex(db)); return rc; } /* ** This function is called to merge two changes to the same row together as ** part of an sqlite3changeset_concat() operation. A new change object is ** allocated and a pointer to it stored in *ppNew. */ static int sessionChangeMerge( | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 | sqlite3_finalize(sApply.pDelete); sqlite3_finalize(sApply.pUpdate); sqlite3_finalize(sApply.pSelect); sqlite3_free((char*)sApply.azCol); /* cast works around VC++ bug */ sqlite3_mutex_leave(sqlite3_db_mutex(db)); return rc; } /* ** Apply the changeset passed via pChangeset/nChangeset to the main database ** attached to handle "db". Invoke the supplied conflict handler callback ** to resolve any conflicts encountered while applying the change. */ int sqlite3changeset_apply( sqlite3 *db, /* Apply change to "main" db of this handle */ int nChangeset, /* Size of changeset in bytes */ void *pChangeset, /* Changeset blob */ int(*xFilter)( void *pCtx, /* Copy of sixth arg to _apply() */ const char *zTab /* Table name */ ), int(*xConflict)( void *pCtx, /* Copy of fifth arg to _apply() */ int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ sqlite3_changeset_iter *p /* Handle describing change and conflict */ ), void *pCtx /* First argument passed to xConflict */ ){ sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */ int rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset); if( rc==SQLITE_OK ){ rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx); } return rc; } /* ** Apply the changeset passed via xInput/pIn to the main database ** attached to handle "db". Invoke the supplied conflict handler callback ** to resolve any conflicts encountered while applying the change. */ int sqlite3changeset_apply_str( sqlite3 *db, /* Apply change to "main" db of this handle */ int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */ void *pIn, /* First arg for xInput */ int(*xFilter)( void *pCtx, /* Copy of sixth arg to _apply() */ const char *zTab /* Table name */ ), int(*xConflict)( void *pCtx, /* Copy of sixth arg to _apply() */ int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ sqlite3_changeset_iter *p /* Handle describing change and conflict */ ), void *pCtx /* First argument passed to xConflict */ ){ sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */ int rc = sqlite3changeset_start_str(&pIter, xInput, pIn); if( rc==SQLITE_OK ){ rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx); } return rc; } /* ** This function is called to merge two changes to the same row together as ** part of an sqlite3changeset_concat() operation. A new change object is ** allocated and a pointer to it stored in *ppNew. */ static int sessionChangeMerge( |
︙ | ︙ |
Changes to ext/session/sqlite3session.h.
︙ | ︙ | |||
888 889 890 891 892 893 894 895 896 897 898 899 900 901 | ** SQLite error code returned. */ int sqlite3changeset_apply( sqlite3 *db, /* Apply change to "main" db of this handle */ int nChangeset, /* Size of changeset in bytes */ void *pChangeset, /* Changeset blob */ int(*xFilter)( void *pCtx, /* Copy of sixth arg to _apply() */ const char *zTab /* Table name */ ), int(*xConflict)( void *pCtx, /* Copy of sixth arg to _apply() */ int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ sqlite3_changeset_iter *p /* Handle describing change and conflict */ | > > > > > > > > > > > > > > > > > > > > > > > > | 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 | ** SQLite error code returned. */ int sqlite3changeset_apply( sqlite3 *db, /* Apply change to "main" db of this handle */ int nChangeset, /* Size of changeset in bytes */ void *pChangeset, /* Changeset blob */ int(*xFilter)( void *pCtx, /* Copy of sixth arg to _apply() */ const char *zTab /* Table name */ ), int(*xConflict)( void *pCtx, /* Copy of sixth arg to _apply() */ int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ sqlite3_changeset_iter *p /* Handle describing change and conflict */ ), void *pCtx /* First argument passed to xConflict */ ); /* ** This function is similar to sqlite3changeset_apply(), except that instead ** of reading data from a single buffer, it requests it one chunk at a time ** from the application by invoking the supplied xInput() callback. ** ** See the documentation for sqlite3changeset_start_str() for a description ** of how the xInput callback should be implemented. */ int sqlite3changeset_apply_str( sqlite3 *db, /* Apply change to "main" db of this handle */ int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */ void *pIn, /* First arg for xInput */ int(*xFilter)( void *pCtx, /* Copy of sixth arg to _apply() */ const char *zTab /* Table name */ ), int(*xConflict)( void *pCtx, /* Copy of sixth arg to _apply() */ int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ sqlite3_changeset_iter *p /* Handle describing change and conflict */ |
︙ | ︙ |
Changes to ext/session/test_session.c.
︙ | ︙ | |||
9 10 11 12 13 14 15 16 17 18 19 20 | typedef struct TestSession TestSession; struct TestSession { sqlite3_session *pSession; Tcl_Interp *interp; Tcl_Obj *pFilterScript; }; #define SESSION_STREAM_TCL_VAR "sqlite3session_streams" /* ** Attempt to find the global variable zVar within interpreter interp | > > > > > > > > | | | | | | | 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | typedef struct TestSession TestSession; struct TestSession { sqlite3_session *pSession; Tcl_Interp *interp; Tcl_Obj *pFilterScript; }; typedef struct TestStreamInput TestStreamInput; struct TestStreamInput { int nStream; /* Maximum chunk size */ unsigned char *aData; /* Pointer to buffer containing data */ int nData; /* Size of buffer aData in bytes */ int iData; /* Bytes of data already read by sessions */ }; #define SESSION_STREAM_TCL_VAR "sqlite3session_streams" /* ** Attempt to find the global variable zVar within interpreter interp ** and extract an integer value from it. Return this value. ** ** If the named variable cannot be found, or if it cannot be interpreted ** as a integer, return 0. */ static int test_tcl_integer(Tcl_Interp *interp, const char *zVar){ Tcl_Obj *pObj; int iVal = 0; pObj = Tcl_ObjGetVar2(interp, Tcl_NewStringObj(zVar, -1), 0, TCL_GLOBAL_ONLY); if( pObj ) Tcl_GetIntFromObj(0, pObj, &iVal); return iVal; } static int test_session_error(Tcl_Interp *interp, int rc){ extern const char *sqlite3ErrName(int); Tcl_SetObjResult(interp, Tcl_NewStringObj(sqlite3ErrName(rc), -1)); return TCL_ERROR; } |
︙ | ︙ | |||
145 146 147 148 149 150 151 | } break; } case 7: /* patchset */ case 1: { /* changeset */ TestSessionsBlob o = {0, 0}; | | | 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | } break; } case 7: /* patchset */ case 1: { /* changeset */ TestSessionsBlob o = {0, 0}; if( test_tcl_integer(interp, SESSION_STREAM_TCL_VAR) ){ void *pCtx = (void*)&o; if( iSub==7 ){ rc = sqlite3session_patchset_str(pSession, testSessionsOutput, pCtx); }else{ rc = sqlite3session_changeset_str(pSession, testSessionsOutput, pCtx); } }else{ |
︙ | ︙ | |||
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 | } if( eConf==SQLITE_CHANGESET_DATA ){ return SQLITE_CHANGESET_REPLACE; } return SQLITE_CHANGESET_OMIT; } /* ** sqlite3changeset_apply DB CHANGESET CONFLICT-SCRIPT ?FILTER-SCRIPT? */ static int test_sqlite3changeset_apply( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ sqlite3 *db; /* Database handle */ Tcl_CmdInfo info; /* Database Tcl command (objv[1]) info */ int rc; /* Return code from changeset_invert() */ void *pChangeset; /* Buffer containing changeset */ int nChangeset; /* Size of buffer aChangeset in bytes */ TestConflictHandler ctx; if( objc!=4 && objc!=5 ){ Tcl_WrongNumArgs(interp, 1, objv, "DB CHANGESET CONFLICT-SCRIPT ?FILTER-SCRIPT?" ); return TCL_ERROR; } if( 0==Tcl_GetCommandInfo(interp, Tcl_GetString(objv[1]), &info) ){ Tcl_AppendResult(interp, "no such handle: ", Tcl_GetString(objv[2]), 0); return TCL_ERROR; } db = *(sqlite3 **)info.objClientData; pChangeset = (void *)Tcl_GetByteArrayFromObj(objv[2], &nChangeset); ctx.pConflictScript = objv[3]; ctx.pFilterScript = objc==5 ? objv[4] : 0; ctx.interp = interp; | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | | > > > > > > > > | 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 | } if( eConf==SQLITE_CHANGESET_DATA ){ return SQLITE_CHANGESET_REPLACE; } return SQLITE_CHANGESET_OMIT; } static int testStreamInput( void *pCtx, /* Context pointer */ void *pData, /* Buffer to populate */ int *pnData /* IN/OUT: Bytes requested/supplied */ ){ TestStreamInput *p = (TestStreamInput*)pCtx; int nReq = *pnData; /* Bytes of data requested */ int nRem = p->nData - p->iData; /* Bytes of data available */ int nRet = p->nStream; /* Bytes actually returned */ if( nRet>nReq ) nRet = nReq; if( nRet>nRem ) nRet = nRem; assert( nRet>=0 ); if( nRet>0 ){ memcpy(pData, &p->aData[p->iData], nRet); p->iData += nRet; } *pnData = nRet; return SQLITE_OK; } /* ** sqlite3changeset_apply DB CHANGESET CONFLICT-SCRIPT ?FILTER-SCRIPT? */ static int test_sqlite3changeset_apply( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ sqlite3 *db; /* Database handle */ Tcl_CmdInfo info; /* Database Tcl command (objv[1]) info */ int rc; /* Return code from changeset_invert() */ void *pChangeset; /* Buffer containing changeset */ int nChangeset; /* Size of buffer aChangeset in bytes */ TestConflictHandler ctx; TestStreamInput sStr; memset(&sStr, 0, sizeof(sStr)); sStr.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR); if( objc!=4 && objc!=5 ){ Tcl_WrongNumArgs(interp, 1, objv, "DB CHANGESET CONFLICT-SCRIPT ?FILTER-SCRIPT?" ); return TCL_ERROR; } if( 0==Tcl_GetCommandInfo(interp, Tcl_GetString(objv[1]), &info) ){ Tcl_AppendResult(interp, "no such handle: ", Tcl_GetString(objv[2]), 0); return TCL_ERROR; } db = *(sqlite3 **)info.objClientData; pChangeset = (void *)Tcl_GetByteArrayFromObj(objv[2], &nChangeset); ctx.pConflictScript = objv[3]; ctx.pFilterScript = objc==5 ? objv[4] : 0; ctx.interp = interp; if( sStr.nStream==0 ){ rc = sqlite3changeset_apply(db, nChangeset, pChangeset, (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx ); }else{ sStr.aData = (unsigned char*)pChangeset; sStr.nData = nChangeset; rc = sqlite3changeset_apply_str(db, testStreamInput, (void*)&sStr, (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx ); } if( rc!=SQLITE_OK ){ return test_session_error(interp, rc); } Tcl_ResetResult(interp); return TCL_OK; } |
︙ | ︙ | |||
628 629 630 631 632 633 634 | void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ int rc; /* Return code from changeset_invert() */ void *aChangeset; /* Input changeset */ | | | | | 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 | void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ int rc; /* Return code from changeset_invert() */ void *aChangeset; /* Input changeset */ int nChangeset; /* Size of buffer aChangeset in bytes */ void *aOut; /* Output changeset */ int nOut; /* Size of buffer aOut in bytes */ if( objc!=2 ){ Tcl_WrongNumArgs(interp, 1, objv, "CHANGESET"); return TCL_ERROR; } aChangeset = (void *)Tcl_GetByteArrayFromObj(objv[1], &nChangeset); rc = sqlite3changeset_invert(nChangeset, aChangeset, &nOut, &aOut); if( rc!=SQLITE_OK ){ return test_session_error(interp, rc); } Tcl_SetObjResult(interp, Tcl_NewByteArrayObj((unsigned char *)aOut, nOut)); sqlite3_free(aOut); return TCL_OK; } |
︙ | ︙ | |||
689 690 691 692 693 694 695 | */ static int test_sqlite3session_foreach( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ | | | > > > | > > | > > > > > | 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 | */ static int test_sqlite3session_foreach( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ void *pChangeset; int nChangeset; sqlite3_changeset_iter *pIter; int rc; Tcl_Obj *pVarname; Tcl_Obj *pCS; Tcl_Obj *pScript; int isCheckNext = 0; TestStreamInput sStr; memset(&sStr, 0, sizeof(sStr)); if( objc>1 ){ char *zOpt = Tcl_GetString(objv[1]); isCheckNext = (strcmp(zOpt, "-next")==0); } if( objc!=4+isCheckNext ){ Tcl_WrongNumArgs(interp, 1, objv, "?-next? VARNAME CHANGESET SCRIPT"); return TCL_ERROR; } pVarname = objv[1+isCheckNext]; pCS = objv[2+isCheckNext]; pScript = objv[3+isCheckNext]; pChangeset = (void *)Tcl_GetByteArrayFromObj(pCS, &nChangeset); sStr.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR); if( sStr.nStream==0 ){ rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset); }else{ sStr.aData = (unsigned char*)pChangeset; sStr.nData = nChangeset; rc = sqlite3changeset_start_str(&pIter, testStreamInput, (void*)&sStr); } if( rc!=SQLITE_OK ){ return test_session_error(interp, rc); } while( SQLITE_ROW==sqlite3changeset_next(pIter) ){ int nCol; /* Number of columns in table */ int nCol2; /* Number of columns in table */ |
︙ | ︙ |
Changes to test/session.test.
︙ | ︙ | |||
12 13 14 15 16 17 18 19 20 21 | source $testdir/permutations.test ifcapable session { # First run tests with sqlite3_extended_error_codes() set, then # again with it clear. run_test_suite session_eec run_test_suite session } finish_test | > | 12 13 14 15 16 17 18 19 20 21 22 | source $testdir/permutations.test ifcapable session { # First run tests with sqlite3_extended_error_codes() set, then # again with it clear. run_test_suite session_eec run_test_suite session run_test_suite session_str } finish_test |