Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Make sure constraint checks occur in the correct order, even in the presence of upserts. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | upsert |
Files: | files | file ages | folders |
SHA3-256: |
07fb30c3de7ff396ae2ce8a0d20352b5 |
User & Date: | drh 2018-04-14 20:24:36.183 |
Context
2018-04-14
| ||
22:35 | Get upsert working on WITHOUT ROWID tables. (check-in: d3c53fd317 user: drh tags: upsert) | |
20:24 | Make sure constraint checks occur in the correct order, even in the presence of upserts. (check-in: 07fb30c3de user: drh tags: upsert) | |
2018-04-13
| ||
21:55 | First cut at logic to perform DO UPDATE for rowid tables. (check-in: a9080bc8b8 user: drh tags: upsert) | |
Changes
Changes to src/insert.c.
︙ | ︙ | |||
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 | } testcase( w.eCode==0 ); testcase( w.eCode==CKCNSTRNT_COLUMN ); testcase( w.eCode==CKCNSTRNT_ROWID ); testcase( w.eCode==(CKCNSTRNT_ROWID|CKCNSTRNT_COLUMN) ); return !w.eCode; } /* ** Generate code to do constraint checks prior to an INSERT or an UPDATE ** on table pTab. ** ** The regNewData parameter is the first register in a range that contains ** the data to be inserted or the data after the update. There will be | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 | } testcase( w.eCode==0 ); testcase( w.eCode==CKCNSTRNT_COLUMN ); testcase( w.eCode==CKCNSTRNT_ROWID ); testcase( w.eCode==(CKCNSTRNT_ROWID|CKCNSTRNT_COLUMN) ); return !w.eCode; } /* ** An instance of the ConstraintAddr object remembers the byte-code addresses ** for sections of the constraint checks that deal with uniqueness constraints ** on the rowid and on the upsert constraint. ** ** This information is passed into checkReorderConstraintChecks() to insert ** some OP_Goto operations so that the rowid and upsert constraints occur ** in the correct order relative to other constraints. */ typedef struct ConstraintAddr ConstraintAddr; struct ConstraintAddr { int ipkTop; /* Subroutine for rowid constraint check */ int upsertTop; /* Label for upsert constraint check subroutine */ int ipkBtm; /* Return opcode rowid constraint check */ int upsertBtm; /* upsert constraint returns to this label */ }; /* ** Generate any OP_Goto operations needed to cause constraints to be ** run that haven't already been run. */ static void reorderConstraintChecks(Vdbe *v, ConstraintAddr *p){ if( p->upsertTop ){ sqlite3VdbeGoto(v, p->upsertTop); VdbeComment((v, "call upsert subroutine")); sqlite3VdbeResolveLabel(v, p->upsertBtm); p->upsertTop = 0; } if( p->ipkTop ){ sqlite3VdbeGoto(v, p->ipkTop); VdbeComment((v, "call rowid constraint-check subroutine")); sqlite3VdbeJumpHere(v, p->ipkBtm); p->ipkTop = 0; } } /* ** Generate code to do constraint checks prior to an INSERT or an UPDATE ** on table pTab. ** ** The regNewData parameter is the first register in a range that contains ** the data to be inserted or the data after the update. There will be |
︙ | ︙ | |||
1262 1263 1264 1265 1266 1267 1268 | int i; /* loop counter */ int ix; /* Index loop counter */ int nCol; /* Number of columns */ int onError; /* Conflict resolution strategy */ int addr1; /* Address of jump instruction */ int seenReplace = 0; /* True if REPLACE is used to resolve INT PK conflict */ int nPkField; /* Number of fields in PRIMARY KEY. 1 for ROWID tables */ | | | > > > | 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 | int i; /* loop counter */ int ix; /* Index loop counter */ int nCol; /* Number of columns */ int onError; /* Conflict resolution strategy */ int addr1; /* Address of jump instruction */ int seenReplace = 0; /* True if REPLACE is used to resolve INT PK conflict */ int nPkField; /* Number of fields in PRIMARY KEY. 1 for ROWID tables */ ConstraintAddr sAddr;/* Address information for constraint reordering */ Index *pUpIdx = 0; /* Index to which to apply the upsert */ u8 isUpdate; /* True if this is an UPDATE operation */ u8 bAffinityDone = 0; /* True if the OP_Affinity operation has been run */ int upsertBypass = 0; /* Address of Goto to bypass upsert subroutine */ isUpdate = regOldData!=0; db = pParse->db; v = sqlite3GetVdbe(pParse); assert( v!=0 ); assert( pTab->pSelect==0 ); /* This table is not a VIEW */ nCol = pTab->nCol; sAddr.ipkTop = 0; sAddr.upsertTop = 0; /* pPk is the PRIMARY KEY index for WITHOUT ROWID tables and NULL for ** normal rowid tables. nPkField is the number of key fields in the ** pPk index or 1 for a rowid table. In other words, nPkField is the ** number of fields in the true primary key of the table. */ if( HasRowid(pTab) ){ pPk = 0; |
︙ | ︙ | |||
1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 | P5_ConstraintCheck); } sqlite3VdbeResolveLabel(v, allOk); } pParse->iSelfTab = 0; } #endif /* !defined(SQLITE_OMIT_CHECK) */ /* If rowid is changing, make sure the new rowid does not previously ** exist in the table. */ if( pkChng && pPk==0 ){ int addrRowidOk = sqlite3VdbeMakeLabel(v); | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 | P5_ConstraintCheck); } sqlite3VdbeResolveLabel(v, allOk); } pParse->iSelfTab = 0; } #endif /* !defined(SQLITE_OMIT_CHECK) */ /* UNIQUE and PRIMARY KEY constraints should be handled in the following ** order: ** ** (1) OE_Abort, OE_Fail, OE_Rollback, OE_Ignore ** (2) OE_Update ** (3) OE_Replace ** ** OE_Fail and OE_Ignore must happen before any changes are made. ** OE_Update guarantees that only a single row will change, so it ** must happen before OE_Replace. Technically, OE_Abort and OE_Rollback ** could happen in any order, but they are grouped up front for ** convenience. ** ** Constraint checking code is generated in this order: ** (A) The rowid constraint ** (B) Unique index constraints that do not have OE_Replace as their ** default conflict resolution strategy ** (C) Unique index that do use OE_Replace by default. ** ** The ordering of (2) and (3) is accomplished by making sure the linked ** list of indexes attached to a table puts all OE_Replace indexes last ** in the list. See sqlite3CreateIndex() for where that happens. */ /* If there is an ON CONFLICT clause without a constraint-target ** (In other words, one of "ON CONFLICT DO NOTHING" or ** "ON DUPLICATION KEY UPDATE") then change the overrideError to ** whichever is appropriate. */ if( pUpsert ){ if( pUpsert->pUpsertTarget==0 ){ if( pUpsert->pUpsertSet==0 ){ /* An ON CONFLICT DO NOTHING clause, without a constraint-target. ** Make all unique constraint resolution be OE_Ignore */ overrideError = OE_Ignore; pUpsert = 0; }else{ /* An ON DUPLICATE KEY UPDATE clause. All unique constraints ** do upsert processing */ overrideError = OE_Update; } }else if( (pUpIdx = pUpsert->pUpsertIdx)!=0 ){ sAddr.upsertTop = sqlite3VdbeMakeLabel(v); sAddr.upsertBtm = sqlite3VdbeMakeLabel(v); } } /* If rowid is changing, make sure the new rowid does not previously ** exist in the table. */ if( pkChng && pPk==0 ){ int addrRowidOk = sqlite3VdbeMakeLabel(v); |
︙ | ︙ | |||
1396 1397 1398 1399 1400 1401 1402 | ** is unchanged. */ sqlite3VdbeAddOp3(v, OP_Eq, regNewData, addrRowidOk, regOldData); sqlite3VdbeChangeP5(v, SQLITE_NOTNULL); VdbeCoverage(v); } /* figure out whether or not upsert applies in this case */ | | > > > > > > | | < > | < < < > | 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 | ** is unchanged. */ sqlite3VdbeAddOp3(v, OP_Eq, regNewData, addrRowidOk, regOldData); sqlite3VdbeChangeP5(v, SQLITE_NOTNULL); VdbeCoverage(v); } /* figure out whether or not upsert applies in this case */ if( pUpsert && pUpsert->pUpsertIdx==0 ){ if( pUpsert->pUpsertSet==0 ){ onError = OE_Ignore; /* DO NOTHING is the same as INSERT OR IGNORE */ }else{ onError = OE_Update; /* DO UPDATE */ } } /* If the response to a rowid conflict is REPLACE but the response ** to some other UNIQUE constraint is FAIL or IGNORE, then we need ** to defer the running of the rowid conflict checking until after ** the UNIQUE constraints have run. */ assert( OE_Update>OE_Replace ); assert( OE_Ignore<OE_Replace ); assert( OE_Fail<OE_Replace ); assert( OE_Abort<OE_Replace ); assert( OE_Rollback<OE_Replace ); if( onError>=OE_Replace && onError!=overrideError && pTab->pIndex ){ sAddr.ipkTop = sqlite3VdbeAddOp0(v, OP_Goto)+1; } /* Check to see if the new rowid already exists in the table. Skip ** the following conflict logic if it does not. */ VdbeNoopComment((v, "constraint checks for ROWID")); sqlite3VdbeAddOp3(v, OP_NotExists, iDataCur, addrRowidOk, regNewData); VdbeCoverage(v); switch( onError ){ default: { onError = OE_Abort; /* Fall thru into the next case */ |
︙ | ︙ | |||
1495 1496 1497 1498 1499 1500 1501 | #endif case OE_Ignore: { sqlite3VdbeGoto(v, ignoreDest); break; } } sqlite3VdbeResolveLabel(v, addrRowidOk); | | | | > > > > | > | 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 | #endif case OE_Ignore: { sqlite3VdbeGoto(v, ignoreDest); break; } } sqlite3VdbeResolveLabel(v, addrRowidOk); if( sAddr.ipkTop ){ sAddr.ipkBtm = sqlite3VdbeAddOp0(v, OP_Goto); sqlite3VdbeJumpHere(v, sAddr.ipkTop-1); } } /* Test all UNIQUE constraints by creating entries for each UNIQUE ** index and making sure that duplicate entries do not already exist. ** Compute the revised record entries for indices as we go. ** ** This loop also handles the case of the PRIMARY KEY index for a ** WITHOUT ROWID table. */ for(ix=0, pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext, ix++){ int regIdx; /* Range of registers hold conent for pIdx */ int regR; /* Range of registers holding conflicting PK */ int iThisCur; /* Cursor for this UNIQUE index */ int addrUniqueOk; /* Jump here if the UNIQUE constraint is satisfied */ if( aRegIdx[ix]==0 ) continue; /* Skip indices that do not change */ VdbeNoopComment((v, "constraint checks for %s", pIdx->zName)); if( bAffinityDone==0 ){ sqlite3TableAffinity(v, pTab, regNewData+1); bAffinityDone = 1; } iThisCur = iIdxCur+ix; if( pUpIdx==pIdx ){ addrUniqueOk = sAddr.upsertBtm; }else{ addrUniqueOk = sqlite3VdbeMakeLabel(v); } /* Skip partial indices for which the WHERE clause is not true */ if( pIdx->pPartIdxWhere ){ sqlite3VdbeAddOp2(v, OP_Null, 0, aRegIdx[ix]); pParse->iSelfTab = -(regNewData+1); sqlite3ExprIfFalseDup(pParse, pIdx->pPartIdxWhere, addrUniqueOk, SQLITE_JUMPIFNULL); |
︙ | ︙ | |||
1579 1580 1581 1582 1583 1584 1585 | continue; /* pIdx is not a UNIQUE index */ } if( overrideError!=OE_Default ){ onError = overrideError; }else if( onError==OE_Default ){ onError = OE_Abort; } | > > | > | > > > | 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 | continue; /* pIdx is not a UNIQUE index */ } if( overrideError!=OE_Default ){ onError = overrideError; }else if( onError==OE_Default ){ onError = OE_Abort; } if( onError==OE_Replace ){ reorderConstraintChecks(v, &sAddr); } /* Figure out if the upsert clause applies to this index */ if( pUpIdx==pIdx ){ if( pUpsert->pUpsertSet==0 ){ onError = OE_Ignore; /* DO NOTHING is the same as INSERT OR IGNORE */ }else{ onError = OE_Update; /* DO UPDATE */ } upsertBypass = sqlite3VdbeGoto(v, 0); VdbeComment((v, "Upsert bypass")); sqlite3VdbeResolveLabel(v, sAddr.upsertTop); } /* Collision detection may be omitted if all of the following are true: ** (1) The conflict resolution algorithm is REPLACE ** (2) The table is a WITHOUT ROWID table ** (3) There are no secondary indexes on the table ** (4) No delete triggers need to be fired if there is a conflict |
︙ | ︙ | |||
1706 1707 1708 1709 1710 1711 1712 | seenReplace = 1; break; } } sqlite3VdbeResolveLabel(v, addrUniqueOk); sqlite3ExprCachePop(pParse); if( regR!=regIdx ) sqlite3ReleaseTempRange(pParse, regR, nPkField); | > | < < < > | 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 | seenReplace = 1; break; } } sqlite3VdbeResolveLabel(v, addrUniqueOk); sqlite3ExprCachePop(pParse); if( regR!=regIdx ) sqlite3ReleaseTempRange(pParse, regR, nPkField); if( pUpIdx==pIdx ) sqlite3VdbeJumpHere(v, upsertBypass); } reorderConstraintChecks(v, &sAddr); *pbMayReplace = seenReplace; VdbeModuleComment((v, "END: GenCnstCks(%d)", seenReplace)); } #ifdef SQLITE_ENABLE_NULL_TRIM /* |
︙ | ︙ |