/ Check-in [940606b3]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:In checkindex.c, use C code instead of SQL/group_concat() to compose various SQL clauses. This is to make it easier to support indexes on expressions.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | checkindex
Files: files | file ages | folders
SHA3-256:940606b3af059eb3f79d71fec871ea88df8bce0349f5b33b79c147a85610e269
User & Date: dan 2017-10-30 17:05:18
Context
2017-10-30
19:38
Add support for indexes on expressions to incremental_index_check. check-in: 8c1c701f user: dan tags: checkindex
17:05
In checkindex.c, use C code instead of SQL/group_concat() to compose various SQL clauses. This is to make it easier to support indexes on expressions. check-in: 940606b3 user: dan tags: checkindex
08:04
Fix an issue in incremental_index_check with indexes that use non-default collation sequences. check-in: 3ebb2351 user: dan tags: checkindex
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to ext/repair/checkindex.c.

    43     43   struct CidxCursor {
    44     44     sqlite3_vtab_cursor base;       /* Base class.  Must be first */
    45     45     sqlite3_stmt *pStmt;
    46     46   };
    47     47   
    48     48   typedef struct CidxColumn CidxColumn;
    49     49   struct CidxColumn {
    50         -  char *zName;
    51         -  char *zColl;
    52         -  int bDesc;
           50  +  char *zExpr;                    /* Text for indexed expression */
           51  +  int bDesc;                      /* True for DESC columns, otherwise false */
           52  +  int bKey;                       /* Part of index, not PK */
           53  +};
           54  +
           55  +typedef struct CidxIndex CidxIndex;
           56  +struct CidxIndex {
           57  +  int nCol;                       /* Elements in aCol[] array */
           58  +  CidxColumn aCol[1];             /* Array of indexed columns */
    53     59   };
    54     60   
    55     61   static void *cidxMalloc(int *pRc, int n){
    56     62     void *pRet = 0;
    57     63     assert( n!=0 );
    58     64     if( *pRc==SQLITE_OK ){
    59     65       pRet = sqlite3_malloc(n);
................................................................................
   253    259     if( *pRc==SQLITE_OK ){
   254    260       int n = strlen(zStr);
   255    261       zRet = cidxMalloc(pRc, n+1);
   256    262       if( zRet ) memcpy(zRet, zStr, n+1);
   257    263     }
   258    264     return zRet;
   259    265   }
          266  +
          267  +static void cidxFreeIndex(CidxIndex *pIdx){
          268  +  if( pIdx ){
          269  +    int i;
          270  +    for(i=0; i<pIdx->nCol; i++){
          271  +      sqlite3_free(pIdx->aCol[i].zExpr);
          272  +    }
          273  +    sqlite3_free(pIdx);
          274  +  }
          275  +}
   260    276   
   261    277   static int cidxLookupIndex(
   262    278     CidxCursor *pCsr,               /* Cursor object */
   263    279     const char *zIdx,               /* Name of index to look up */
   264         -  int *pnCol,                     /* OUT: Number of columns in index */
   265         -  CidxColumn **paCol,             /* OUT: Columns */
   266         -  char **pzTab,                   /* OUT: Table name */
   267         -  char **pzCurrentKey,            /* OUT: Expression for current_key */
   268         -  char **pzOrderBy,               /* OUT: ORDER BY expression list */
   269         -  char **pzSubWhere,              /* OUT: sub-query WHERE clause */
   270         -  char **pzSubExpr                /* OUT: sub-query WHERE clause */
          280  +  CidxIndex **ppIdx,              /* OUT: Description of columns */
          281  +  char **pzTab                    /* OUT: Table name */
   271    282   ){
   272    283     int rc = SQLITE_OK;
   273    284     char *zTab = 0;
   274         -  char *zCurrentKey = 0;
   275         -  char *zOrderBy = 0;
   276         -  char *zSubWhere = 0;
   277         -  char *zSubExpr = 0;
   278         -  CidxColumn *aCol = 0;
          285  +  CidxIndex *pIdx = 0;
   279    286   
   280    287     sqlite3_stmt *pFindTab = 0;
   281         -  sqlite3_stmt *pGroup = 0;
          288  +  sqlite3_stmt *pInfo = 0;
   282    289       
   283         -  /* Find the table */
          290  +  /* Find the table for this index. */
   284    291     pFindTab = cidxPrepare(&rc, pCsr, 
   285    292         "SELECT tbl_name FROM sqlite_master WHERE name=%Q AND type='index'",
   286    293         zIdx
   287    294     );
   288    295     if( rc==SQLITE_OK && sqlite3_step(pFindTab)==SQLITE_ROW ){
   289    296       zTab = cidxStrdup(&rc, (const char*)sqlite3_column_text(pFindTab, 0));
   290    297     }
   291    298     cidxFinalize(&rc, pFindTab);
   292    299     if( rc==SQLITE_OK && zTab==0 ){
   293    300       rc = SQLITE_ERROR;
   294    301     }
   295    302   
   296         -  pGroup = cidxPrepare(&rc, pCsr,
   297         -      "SELECT group_concat("
   298         -      "  coalesce('quote(' || name || ')', 'rowid'), '|| '','' ||'"
   299         -      ") AS zCurrentKey,"
   300         -      "       group_concat("
   301         -      "  coalesce(name, 'rowid') || ' COLLATE ' || coll "
   302         -      "  || CASE WHEN desc THEN ' DESC' ELSE '' END,"
   303         -      "  ', '"
   304         -      ") AS zOrderBy,"
   305         -      "       group_concat("
   306         -      "         CASE WHEN key==1 THEN NULL ELSE "
   307         -      "  coalesce(name, 'rowid') || ' IS \"%w\".' || coalesce(name, 'rowid') "
   308         -      "         END,"
   309         -      "  ' AND '"
   310         -      ") AS zSubWhere,"
   311         -      "       group_concat("
   312         -      "         CASE WHEN key==0 THEN NULL ELSE "
   313         -      "  coalesce(name, 'rowid') || ' IS \"%w\".' || coalesce(name, 'rowid') "
   314         -      "         END,"
   315         -      "  ' AND '"
   316         -      ") AS zSubExpr,"
   317         -      "      count(*) AS nCol"
   318         -      " FROM pragma_index_xinfo(%Q);"
   319         -      , zIdx, zIdx, zIdx
   320         -  );
   321         -  if( rc==SQLITE_OK && sqlite3_step(pGroup)==SQLITE_ROW ){
   322         -    zCurrentKey = cidxStrdup(&rc, (const char*)sqlite3_column_text(pGroup, 0));
   323         -    zOrderBy = cidxStrdup(&rc, (const char*)sqlite3_column_text(pGroup, 1));
   324         -    zSubWhere = cidxStrdup(&rc, (const char*)sqlite3_column_text(pGroup, 2));
   325         -    zSubExpr = cidxStrdup(&rc, (const char*)sqlite3_column_text(pGroup, 3));
   326         -    *pnCol = sqlite3_column_int(pGroup, 4);
   327         -  }
   328         -  cidxFinalize(&rc, pGroup);
   329         -
   330         -  pGroup = cidxPrepare(&rc, pCsr, "PRAGMA index_xinfo(%Q)", zIdx);
          303  +  pInfo = cidxPrepare(&rc, pCsr, "PRAGMA index_xinfo(%Q)", zIdx);
   331    304     if( rc==SQLITE_OK ){
   332         -    int nByte = 0;
   333         -    int nCol = 0;
   334         -    while( sqlite3_step(pGroup)==SQLITE_ROW ){
   335         -      const char *zName = (const char*)sqlite3_column_text(pGroup, 2);
   336         -      const char *zColl = (const char*)sqlite3_column_text(pGroup, 4);
          305  +    int nAlloc = 0;
          306  +    int iCol = 0;
          307  +
          308  +    while( sqlite3_step(pInfo)==SQLITE_ROW ){
          309  +      const char *zName = (const char*)sqlite3_column_text(pInfo, 2);
          310  +      const char *zColl = (const char*)sqlite3_column_text(pInfo, 4);
          311  +      CidxColumn *p;
   337    312         if( zName==0 ) zName = "rowid";
   338         -      nCol++;
   339         -      nByte += strlen(zName)+1 + strlen(zColl)+1;
          313  +      if( iCol==nAlloc ){
          314  +        int nByte = sizeof(CidxIndex) + sizeof(CidxColumn)*(nAlloc+8);
          315  +        pIdx = (CidxIndex*)sqlite3_realloc(pIdx, nByte);
          316  +      }
          317  +      p = &pIdx->aCol[iCol++];
          318  +      p->zExpr = cidxMprintf(&rc, "\"%w\" COLLATE %s",zName,zColl);
          319  +      p->bDesc = sqlite3_column_int(pInfo, 3);
          320  +      p->bKey = sqlite3_column_int(pInfo, 5);
          321  +      pIdx->nCol = iCol;
   340    322       }
   341         -    rc = sqlite3_reset(pGroup);
   342         -    aCol = (CidxColumn*)cidxMalloc(&rc, sizeof(CidxColumn)*nCol + nByte);
   343         -
   344         -    if( rc==SQLITE_OK ){
   345         -      int iCol = 0;
   346         -      char *z = (char*)&aCol[nCol];
   347         -      while( sqlite3_step(pGroup)==SQLITE_ROW ){
   348         -        int nName, nColl;
   349         -        const char *zName = (const char*)sqlite3_column_text(pGroup, 2);
   350         -        const char *zColl = (const char*)sqlite3_column_text(pGroup, 4);
   351         -        if( zName==0 ) zName = "rowid";
   352         -
   353         -        nName = strlen(zName);
   354         -        nColl = strlen(zColl);
   355         -        memcpy(z, zName, nName);
   356         -        aCol[iCol].zName = z;
   357         -        z += nName+1;
   358         -
   359         -        memcpy(z, zColl, nColl);
   360         -        aCol[iCol].zColl = z;
   361         -        z += nColl+1;
   362         -
   363         -        aCol[iCol].bDesc = sqlite3_column_int(pGroup, 3);
   364         -        iCol++;
   365         -      }
   366         -    }
   367         -    cidxFinalize(&rc, pGroup);
          323  +    cidxFinalize(&rc, pInfo);
   368    324     }
   369    325     
   370    326     if( rc!=SQLITE_OK ){
   371    327       sqlite3_free(zTab);
   372         -    sqlite3_free(zCurrentKey);
   373         -    sqlite3_free(zOrderBy);
   374         -    sqlite3_free(zSubWhere);
   375         -    sqlite3_free(zSubExpr);
   376         -    sqlite3_free(aCol);
          328  +    cidxFreeIndex(pIdx);
   377    329     }else{
   378    330       *pzTab = zTab;
   379         -    *pzCurrentKey = zCurrentKey;
   380         -    *pzOrderBy = zOrderBy;
   381         -    *pzSubWhere = zSubWhere;
   382         -    *pzSubExpr = zSubExpr;
   383         -    *paCol = aCol;
          331  +    *ppIdx = pIdx;
   384    332     }
   385    333   
   386    334     return rc;
   387    335   }
   388    336   
   389    337   static int cidxDecodeAfter(
   390    338     CidxCursor *pCsr, 
................................................................................
   459    407     int *pRc, CidxColumn *aCol, char **azAfter, int iGt, int bLastIsNull
   460    408   ){
   461    409     char *zRet = 0;
   462    410     const char *zSep = "";
   463    411     int i;
   464    412   
   465    413     for(i=0; i<iGt; i++){
   466         -    zRet = cidxMprintf(pRc, "%z%s%s COLLATE %s IS %s", zRet, 
   467         -        zSep, aCol[i].zName, aCol[i].zColl, (azAfter[i] ? azAfter[i] : "NULL")
          414  +    zRet = cidxMprintf(pRc, "%z%s%s IS %s", zRet, 
          415  +        zSep, aCol[i].zExpr, (azAfter[i] ? azAfter[i] : "NULL")
   468    416       );
   469    417       zSep = " AND ";
   470    418     }
   471    419   
   472    420     if( bLastIsNull ){
   473         -    zRet = cidxMprintf(pRc, "%z%s%s IS NULL", zRet, zSep, aCol[iGt].zName);
          421  +    zRet = cidxMprintf(pRc, "%z%s%s IS NULL", zRet, zSep, aCol[iGt].zExpr);
   474    422     }
   475    423     else if( azAfter[iGt] ){
   476         -    zRet = cidxMprintf(pRc, "%z%s%s COLLATE %s %s %s", zRet, 
   477         -        zSep, aCol[iGt].zName, aCol[iGt].zColl, (aCol[iGt].bDesc ? "<" : ">"), 
          424  +    zRet = cidxMprintf(pRc, "%z%s%s %s %s", zRet, 
          425  +        zSep, aCol[iGt].zExpr, (aCol[iGt].bDesc ? "<" : ">"), 
   478    426           azAfter[iGt]
   479    427       );
   480    428     }else{
   481         -    zRet = cidxMprintf(pRc, "%z%s%s IS NOT NULL", zRet, zSep, aCol[iGt].zName);
          429  +    zRet = cidxMprintf(pRc, "%z%s%s IS NOT NULL", zRet, zSep, aCol[iGt].zExpr);
   482    430     }
   483    431   
   484    432     return zRet;
   485    433   }
   486    434   
   487         -static char *cidxColumnList(int *pRc, CidxColumn *aCol, int nCol){
   488         -  int i;
          435  +#define CIDX_CLIST_ALL         0
          436  +#define CIDX_CLIST_ORDERBY     1
          437  +#define CIDX_CLIST_CURRENT_KEY 2
          438  +#define CIDX_CLIST_SUBWHERE    3
          439  +#define CIDX_CLIST_SUBEXPR     4
          440  +
          441  +/*
          442  +** This function returns various strings based on the contents of the
          443  +** CidxIndex structure and the eType parameter.
          444  +*/
          445  +static char *cidxColumnList(
          446  +  int *pRc,                       /* IN/OUT: Error code */
          447  +  const char *zIdx,
          448  +  CidxIndex *pIdx,                /* Indexed columns */
          449  +  int eType                       /* True to include ASC/DESC */
          450  +){
   489    451     char *zRet = 0;
   490         -  const char *zSep = "";
   491         -  for(i=0; i<nCol; i++){
   492         -    zRet = cidxMprintf(pRc, "%z%s%s", zRet, zSep, aCol[i].zName);
   493         -    zSep = ",";
          452  +  if( *pRc==SQLITE_OK ){
          453  +    const char *aDir[2] = {" ASC", " DESC"};
          454  +    int i;
          455  +    const char *zSep = "";
          456  +
          457  +    for(i=0; i<pIdx->nCol; i++){
          458  +      CidxColumn *p = &pIdx->aCol[i];
          459  +      assert( pIdx->aCol[i].bDesc==0 || pIdx->aCol[i].bDesc==1 );
          460  +      switch( eType ){
          461  +
          462  +        case CIDX_CLIST_ORDERBY:
          463  +          zRet = cidxMprintf(pRc, "%z%s%s%s",zRet,zSep,p->zExpr,aDir[p->bDesc]);
          464  +          zSep = ",";
          465  +          break;
          466  +
          467  +        case CIDX_CLIST_CURRENT_KEY:
          468  +          zRet = cidxMprintf(pRc, "%z%squote(%s)", zRet, zSep, p->zExpr);
          469  +          zSep = "||','||";
          470  +          break;
          471  +
          472  +        case CIDX_CLIST_SUBWHERE:
          473  +          if( p->bKey==0 ){
          474  +            zRet = cidxMprintf(pRc, "%z%s%s IS \"%w\".%s", zRet, 
          475  +                zSep, p->zExpr, zIdx, p->zExpr
          476  +            );
          477  +            zSep = " AND ";
          478  +          }
          479  +          break;
          480  +
          481  +        case CIDX_CLIST_SUBEXPR:
          482  +          if( p->bKey==1 ){
          483  +            zRet = cidxMprintf(pRc, "%z%s%s IS \"%w\".%s", zRet, 
          484  +                zSep, p->zExpr, zIdx, p->zExpr
          485  +            );
          486  +            zSep = " AND ";
          487  +          }
          488  +          break;
          489  +
          490  +        default:
          491  +          assert( eType==CIDX_CLIST_ALL );
          492  +          zRet = cidxMprintf(pRc, "%z%s%s", zRet, zSep, p->zExpr);
          493  +          zSep = ",";
          494  +          break;
          495  +      }
          496  +    }
   494    497     }
          498  +
   495    499     return zRet;
   496    500   }
   497    501   
   498    502   /* 
   499    503   ** Position a cursor back to the beginning.
   500    504   */
   501    505   static int cidxFilter(
................................................................................
   512    516       zIdxName = (const char*)sqlite3_value_text(argv[0]);
   513    517       if( argc>1 ){
   514    518         zAfterKey = (const char*)sqlite3_value_text(argv[1]);
   515    519       }
   516    520     }
   517    521   
   518    522     if( zIdxName ){
   519         -    int nCol = 0;
   520    523       char *zTab = 0;
   521    524       char *zCurrentKey = 0;
   522    525       char *zOrderBy = 0;
   523    526       char *zSubWhere = 0;
   524    527       char *zSubExpr = 0;
          528  +    char *zSrcList = 0;
          529  +
   525    530       char **azAfter = 0;
   526         -    CidxColumn *aCol = 0;
          531  +    CidxIndex *pIdx = 0;
          532  +
          533  +    rc = cidxLookupIndex(pCsr, zIdxName, &pIdx, &zTab);
   527    534   
   528         -    rc = cidxLookupIndex(pCsr, zIdxName, 
   529         -        &nCol, &aCol, &zTab, &zCurrentKey, &zOrderBy, &zSubWhere, &zSubExpr
   530         -    );
          535  +    zOrderBy = cidxColumnList(&rc, zIdxName, pIdx, CIDX_CLIST_ORDERBY);
          536  +    zCurrentKey = cidxColumnList(&rc, zIdxName, pIdx, CIDX_CLIST_CURRENT_KEY);
          537  +    zSubWhere = cidxColumnList(&rc, zIdxName, pIdx, CIDX_CLIST_SUBWHERE);
          538  +    zSubExpr = cidxColumnList(&rc, zIdxName, pIdx, CIDX_CLIST_SUBEXPR);
          539  +    /* zSrcList = cidxColumnList(&rc, zIdxName, pIdx, CIDX_CLIST_ALL); */
   531    540   
   532    541       if( rc==SQLITE_OK && zAfterKey ){
   533         -      rc = cidxDecodeAfter(pCsr, nCol, zAfterKey, &azAfter);
          542  +      rc = cidxDecodeAfter(pCsr, pIdx->nCol, zAfterKey, &azAfter);
   534    543       }
   535    544   
   536    545       if( rc || zAfterKey==0 ){
   537    546         pCsr->pStmt = cidxPrepare(&rc, pCsr, 
   538    547             "SELECT (SELECT %s FROM %Q WHERE %s), %s FROM %Q AS %Q ORDER BY %s",
   539    548             zSubExpr, zTab, zSubWhere, zCurrentKey, zTab, zIdxName, zOrderBy
   540    549         );
   541         -      /* printf("SQL: %s\n", sqlite3_sql(pCsr->pStmt)); */
          550  +      /* printf("SQL: %s\n", sqlite3_sql(pCsr->pStmt));  */
   542    551       }else{
   543         -      char *zList = cidxColumnList(&rc, aCol, nCol);
          552  +      char *zList = cidxColumnList(&rc, zIdxName, pIdx, 0);
   544    553         const char *zSep = "";
   545    554         char *zSql;
   546    555         int i;
   547    556   
   548    557         zSql = cidxMprintf(&rc, "SELECT (SELECT %s FROM %Q WHERE %s), %s FROM (",
   549    558             zSubExpr, zTab, zSubWhere, zCurrentKey
   550    559         );
   551         -      for(i=nCol-1; i>=0; i--){
          560  +      for(i=pIdx->nCol-1; i>=0; i--){
   552    561           int j;
   553         -        if( aCol[i].bDesc && azAfter[i]==0 ) continue;
          562  +        if( pIdx->aCol[i].bDesc && azAfter[i]==0 ) continue;
   554    563           for(j=0; j<2; j++){
   555         -          char *zWhere = cidxWhere(&rc, aCol, azAfter, i, j);
          564  +          char *zWhere = cidxWhere(&rc, pIdx->aCol, azAfter, i, j);
   556    565             zSql = cidxMprintf(&rc, 
   557    566                 "%z%s SELECT * FROM (SELECT %s FROM %Q WHERE %z ORDER BY %s)",
   558    567                 zSql, zSep, zList, zTab, zWhere, zOrderBy
   559    568                 );
   560    569             zSep = " UNION ALL ";
   561         -          if( aCol[i].bDesc==0 ) break;
          570  +          if( pIdx->aCol[i].bDesc==0 ) break;
   562    571           }
   563    572         }
   564    573         zSql = cidxMprintf(&rc, "%z) AS %Q", zSql, zIdxName);
   565    574         sqlite3_free(zList);
   566    575   
   567    576         /* printf("SQL: %s\n", zSql); */
   568    577         pCsr->pStmt = cidxPrepare(&rc, pCsr, "%z", zSql);
................................................................................
   569    578       }
   570    579   
   571    580       sqlite3_free(zTab);
   572    581       sqlite3_free(zCurrentKey);
   573    582       sqlite3_free(zOrderBy);
   574    583       sqlite3_free(zSubWhere);
   575    584       sqlite3_free(zSubExpr);
   576         -    sqlite3_free(aCol);
          585  +    cidxFreeIndex(pIdx);
   577    586       sqlite3_free(azAfter);
   578    587     }
   579    588   
   580    589     if( pCsr->pStmt ){
   581    590       assert( rc==SQLITE_OK );
   582    591       rc = cidxNext(pCursor);
   583    592     }
   584    593     return rc;
   585    594   }
   586    595   
   587         -/* Return a column for the sqlite_btreeinfo table */
          596  +/* 
          597  +** Return a column value.
          598  +*/
   588    599   static int cidxColumn(
   589    600     sqlite3_vtab_cursor *pCursor, 
   590    601     sqlite3_context *ctx, 
   591    602     int iCol
   592    603   ){
   593    604     CidxCursor *pCsr = (CidxCursor*)pCursor;
   594    605     assert( iCol==0 || iCol==1 );

Changes to test/checkindex.test.

    63     63   
    64     64   proc do_index_check_test {tn idx res} {
    65     65     uplevel [list do_execsql_test $tn.1 "
    66     66       SELECT errmsg, current_key FROM incremental_index_check('$idx');
    67     67     " $res]
    68     68   
    69     69     uplevel [list do_test $tn.2 "incr_index_check $idx 1" [list {*}$res]]
    70         -  #uplevel [list do_test $tn.3 "incr_index_check $idx 2" [list {*}$res]]
    71         -  #uplevel [list do_test $tn.4 "incr_index_check $idx 5" [list {*}$res]]
           70  +  uplevel [list do_test $tn.3 "incr_index_check $idx 2" [list {*}$res]]
           71  +  uplevel [list do_test $tn.4 "incr_index_check $idx 5" [list {*}$res]]
    72     72   }
           73  +
    73     74   
    74     75   do_execsql_test 1.2 {
    75     76     SELECT errmsg IS NULL, current_key FROM incremental_index_check('i1');
    76     77   } {
    77     78     1 'five',5
    78     79     1 'four',4
    79     80     1 'one',1