Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Fix rebasing of UPDATE changes against a set of remote changesets that feature both OMIT and REPLACE conflict resolution on different fields of the same row. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | sessions-rebase |
Files: | files | file ages | folders |
SHA3-256: |
d8bc3fdb6ba165ca8d7cab857ede8e7e |
User & Date: | dan 2018-03-21 17:29:53.542 |
Context
2018-03-21
| ||
19:46 | Fix some documentation and other issues with the code on this branch. (check-in: a9ec68627a user: dan tags: sessions-rebase) | |
17:29 | Fix rebasing of UPDATE changes against a set of remote changesets that feature both OMIT and REPLACE conflict resolution on different fields of the same row. (check-in: d8bc3fdb6b user: dan tags: sessions-rebase) | |
2018-03-20
| ||
20:27 | Add further tests and documentation for the sessions rebase feature. (check-in: 7475a363eb user: dan tags: sessions-rebase) | |
Changes
Changes to ext/session/sessionrebase.test.
︙ | ︙ | |||
116 117 118 119 120 121 122 | lappend rebase [sqlite3changeset_apply_v2 db $c xConflict] } } #if {$tn=="2.1.4"} { puts [changeset_to_list $rebase] ; breakpoint } #puts [changeset_to_list [lindex $rebase 0]] ; breakpoint #puts [llength $rebase] | < < | 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | lappend rebase [sqlite3changeset_apply_v2 db $c xConflict] } } #if {$tn=="2.1.4"} { puts [changeset_to_list $rebase] ; breakpoint } #puts [changeset_to_list [lindex $rebase 0]] ; breakpoint #puts [llength $rebase] sqlite3rebaser_create R foreach r $rebase { R configure $r } set c1r [R rebase $c1] R delete #if {$tn=="2.1.4"} { puts [changeset_to_list $c1r] } sqlite3changeset_apply_v2 db2 $c1r xConflictAbort |
︙ | ︙ |
Changes to ext/session/sqlite3session.c.
︙ | ︙ | |||
512 513 514 515 516 517 518 | ** The buffer that the argument points to contains a serialized SQL value. ** Return the number of bytes of space occupied by the value (including ** the type byte). */ static int sessionSerialLen(u8 *a){ int e = *a; int n; | | | 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 | ** The buffer that the argument points to contains a serialized SQL value. ** Return the number of bytes of space occupied by the value (including ** the type byte). */ static int sessionSerialLen(u8 *a){ int e = *a; int n; if( e==0 || e==0xFF ) return 1; if( e==SQLITE_NULL ) return 1; if( e==SQLITE_INTEGER || e==SQLITE_FLOAT ) return 9; return sessionVarintGet(&a[1], &n) + 1 + n; } /* ** Based on the primary key values stored in change aRecord, calculate a |
︙ | ︙ | |||
4523 4524 4525 4526 4527 4528 4529 4530 4531 4532 4533 4534 4535 4536 4537 4538 | int op2, /* Second change operation */ int bIndirect, /* True if second change is indirect */ u8 *aRec, /* Second change record */ int nRec, /* Number of bytes in aRec */ SessionChange **ppNew /* OUT: Merged change */ ){ SessionChange *pNew = 0; if( !pExist ){ pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange) + nRec); if( !pNew ){ return SQLITE_NOMEM; } memset(pNew, 0, sizeof(SessionChange)); pNew->op = op2; pNew->bIndirect = bIndirect; | > < > > | > > > > > > > > > > > > > > > > > > | > > > > > > > > > > > | | > > | > | < > > > > | > > > > > > > > > > > > > > | 4523 4524 4525 4526 4527 4528 4529 4530 4531 4532 4533 4534 4535 4536 4537 4538 4539 4540 4541 4542 4543 4544 4545 4546 4547 4548 4549 4550 4551 4552 4553 4554 4555 4556 4557 4558 4559 4560 4561 4562 4563 4564 4565 4566 4567 4568 4569 4570 4571 4572 4573 4574 4575 4576 4577 4578 4579 4580 4581 4582 4583 4584 4585 4586 4587 4588 4589 4590 4591 4592 4593 4594 4595 4596 4597 4598 4599 4600 4601 4602 4603 4604 4605 4606 | int op2, /* Second change operation */ int bIndirect, /* True if second change is indirect */ u8 *aRec, /* Second change record */ int nRec, /* Number of bytes in aRec */ SessionChange **ppNew /* OUT: Merged change */ ){ SessionChange *pNew = 0; int rc = SQLITE_OK; if( !pExist ){ pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange) + nRec); if( !pNew ){ return SQLITE_NOMEM; } memset(pNew, 0, sizeof(SessionChange)); pNew->op = op2; pNew->bIndirect = bIndirect; pNew->aRecord = (u8*)&pNew[1]; if( bIndirect==0 || bRebase==0 ){ pNew->nRecord = nRec; memcpy(pNew->aRecord, aRec, nRec); }else{ int i; u8 *pIn = aRec; u8 *pOut = pNew->aRecord; for(i=0; i<pTab->nCol; i++){ int nIn = sessionSerialLen(pIn); if( *pIn==0 ){ *pOut++ = 0; }else if( pTab->abPK[i]==0 ){ *pOut++ = 0xFF; }else{ memcpy(pOut, pIn, nIn); pOut += nIn; } pIn += nIn; } pNew->nRecord = pOut - pNew->aRecord; } }else if( bRebase ){ if( pExist->op==SQLITE_DELETE && pExist->bIndirect ){ *ppNew = pExist; }else{ int nByte = nRec + pExist->nRecord + sizeof(SessionChange); pNew = (SessionChange*)sqlite3_malloc(nByte); if( pNew==0 ){ rc = SQLITE_NOMEM; }else{ int i; u8 *a1 = pExist->aRecord; u8 *a2 = aRec; u8 *pOut; memset(pNew, 0, nByte); pNew->bIndirect = bIndirect || pExist->bIndirect; pNew->op = op2; pOut = pNew->aRecord = (u8*)&pNew[1]; for(i=0; i<pTab->nCol; i++){ int n1 = sessionSerialLen(a1); int n2 = sessionSerialLen(a2); if( *a1==0xFF || *a2==0xFF ){ *pOut++ = 0xFF; }else if( *a2==0 ){ memcpy(pOut, a1, n1); pOut += n1; }else{ memcpy(pOut, a2, n2); pOut += n2; } a1 += n1; a2 += n2; } pNew->nRecord = pOut - pNew->aRecord; } sqlite3_free(pExist); } }else{ int op1 = pExist->op; /* ** op1=INSERT, op2=INSERT -> Unsupported. Discard op2. ** op1=INSERT, op2=UPDATE -> INSERT. ** op1=INSERT, op2=DELETE -> (none) |
︙ | ︙ | |||
4635 4636 4637 4638 4639 4640 4641 | pNew->nRecord = (int)(aCsr - pNew->aRecord); } sqlite3_free(pExist); } } *ppNew = pNew; | | | 4686 4687 4688 4689 4690 4691 4692 4693 4694 4695 4696 4697 4698 4699 4700 | pNew->nRecord = (int)(aCsr - pNew->aRecord); } sqlite3_free(pExist); } } *ppNew = pNew; return rc; } /* ** Add all changes in the changeset traversed by the iterator passed as ** the first argument to the changegroup hash tables. */ static int sessionChangesetToHash( |
︙ | ︙ | |||
4995 4996 4997 4998 4999 5000 5001 | memcpy(pOut, a2, nn2); pOut += nn2; }else{ memcpy(pOut, a1, nn1); pOut += nn1; } }else{ | | | | | | | > > > > | < | 5046 5047 5048 5049 5050 5051 5052 5053 5054 5055 5056 5057 5058 5059 5060 5061 5062 5063 5064 5065 5066 5067 5068 5069 5070 5071 5072 5073 5074 5075 5076 5077 5078 5079 5080 5081 5082 5083 5084 5085 5086 5087 5088 5089 5090 5091 5092 5093 5094 5095 5096 5097 5098 5099 5100 5101 5102 5103 5104 5105 5106 5107 5108 5109 5110 5111 5112 5113 5114 5115 5116 5117 5118 5119 5120 5121 5122 5123 5124 5125 5126 5127 5128 | memcpy(pOut, a2, nn2); pOut += nn2; }else{ memcpy(pOut, a1, nn1); pOut += nn1; } }else{ if( *a1==0 || *a1==0xFF ){ memcpy(pOut, a2, nn2); pOut += nn2; }else{ memcpy(pOut, a1, nn1); pOut += nn1; } } a1 += nn1; a2 += nn2; } pBuf->nBuf = pOut-pBuf->aBuf; assert( pBuf->nBuf<=pBuf->nAlloc ); } } static void sessionAppendPartialUpdate( SessionBuffer *pBuf, /* Append record here */ sqlite3_changeset_iter *pIter, /* Iterator pointed at local change */ u8 *aRec, int nRec, /* Local change */ u8 *aChange, int nChange, /* Record to rebase against */ int *pRc /* IN/OUT: Return Code */ ){ sessionBufferGrow(pBuf, 2+nRec+nChange, pRc); if( *pRc==SQLITE_OK ){ int bData = 0; u8 *pOut = &pBuf->aBuf[pBuf->nBuf]; int i; u8 *a1 = aRec; u8 *a2 = aChange; *pOut++ = SQLITE_UPDATE; *pOut++ = pIter->bIndirect; for(i=0; i<pIter->nCol; i++){ int n1 = sessionSerialLen(a1); int n2 = sessionSerialLen(a2); if( pIter->abPK[i] || a2[0]==0 ){ if( !pIter->abPK[i] ) bData = 1; memcpy(pOut, a1, n1); pOut += n1; }else if( a2[0]!=0xFF ){ bData = 1; memcpy(pOut, a2, n2); pOut += n2; }else{ *pOut++ = '\0'; } a1 += n1; a2 += n2; } if( bData ){ a2 = aChange; for(i=0; i<pIter->nCol; i++){ int n1 = sessionSerialLen(a1); int n2 = sessionSerialLen(a2); if( pIter->abPK[i] || a2[0]!=0xFF ){ memcpy(pOut, a1, n1); pOut += n1; }else{ *pOut++ = '\0'; } a1 += n1; a2 += n2; } pBuf->nBuf = (pOut - pBuf->aBuf); } } } static int sessionRebase( sqlite3_rebaser *p, /* Rebaser hash table */ sqlite3_changeset_iter *pIter, /* Input data */ int (*xOutput)(void *pOut, const void *pData, int nData), void *pOut, /* Context for xOutput callback */ int *pnOut, /* OUT: Number of bytes in output changeset */ |
︙ | ︙ | |||
5135 5136 5137 5138 5139 5140 5141 | sessionAppendByte(&sOut, pIter->bIndirect, &rc); sessionAppendRecordMerge(&sOut, pIter->nCol, 1, pCsr, nRec-(pCsr-aRec), pChange->aRecord, pChange->nRecord, &rc ); } }else{ | < < < < < < < < < < | | | < | 5189 5190 5191 5192 5193 5194 5195 5196 5197 5198 5199 5200 5201 5202 5203 5204 5205 | sessionAppendByte(&sOut, pIter->bIndirect, &rc); sessionAppendRecordMerge(&sOut, pIter->nCol, 1, pCsr, nRec-(pCsr-aRec), pChange->aRecord, pChange->nRecord, &rc ); } }else{ sessionAppendPartialUpdate(&sOut, pIter, aRec, nRec, pChange->aRecord, pChange->nRecord, &rc ); } break; default: assert( pIter->op==SQLITE_DELETE ); bDone = 1; if( pChange->op==SQLITE_INSERT ){ |
︙ | ︙ |
Changes to ext/session/sqlite3session.h.
︙ | ︙ | |||
1252 1253 1254 1255 1256 1257 1258 | ** changeset. Or, if the conflict resolution was REPLACE, add ** nothing to the rebased changeset. ** ** <dt>Local DELETE<dd> ** This may conflict with a remote UPDATE or DELETE. In both cases the ** only possible resolution is OMIT. If the remote operation was a ** DELETE, then add no change to the rebased changeset. If the remote | | | | | | > > > > > > > > > > > > > > > > > | 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 | ** changeset. Or, if the conflict resolution was REPLACE, add ** nothing to the rebased changeset. ** ** <dt>Local DELETE<dd> ** This may conflict with a remote UPDATE or DELETE. In both cases the ** only possible resolution is OMIT. If the remote operation was a ** DELETE, then add no change to the rebased changeset. If the remote ** operation was an UPDATE, then the old.* fields of change are updated ** to reflect the new.* values in the UPDATE. ** ** <dt>Local UPDATE<dd> ** This may conflict with a remote UPDATE or DELETE. If it conflicts ** with a DELETE, and the conflict resolution was OMIT, then the update ** is changed into an INSERT. Any undefined values in the new.* record ** from the update change are filled in using the old.* values from ** the conflicting DELETE. Or, if the conflict resolution was REPLACE, ** the UPDATE change is simply omitted from the rebased changeset. ** ** If conflict is with a remote UPDATE and the resolution is OMIT, then ** the old.* values are rebased using the new.* values in the remote ** change. Or, if the resolution is REPLACE, then the change is copied ** into the rebased changeset with updates to columns also updated by ** the conflicting remote UPDATE removed. If this means no columns would ** be updated, the change is omitted. ** </dl> ** ** A local change may be rebased against multiple remote changes ** simultaneously. If a single key is modified by multiple remote ** changesets, they are combined as follows before the local changeset ** is rebased: ** ** <ul> ** <li> If there has been one or more REPLACE resolutions on a ** key, it is rebased according to a REPLACE. ** ** <li> If there have been no REPLACE resolutions on a key, then ** the local changeset is rebased according to the most recent ** of the OMIT resolutions. ** </ul> ** ** Note that conflict resolutions from multiple remote changesets are ** combined on a per-field basis, not per-row. This means that in the ** case of multiple remote UPDATE operations, some fields of a single ** local change may be rebased for REPLACE while others are rebased for ** OMIT. */ typedef struct sqlite3_rebaser sqlite3_rebaser; /* Create a new rebaser object */ int sqlite3rebaser_create(sqlite3_rebaser **ppNew); /* Call this one or more times to configure a rebaser */ |
︙ | ︙ |