/ Check-in [a5076831]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Improve the performance of the .recover command.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | dbdata
Files: files | file ages | folders
SHA3-256:a50768314d10d743a0cc013b434b516f0763e0a6c5b79655d8fefde7de53e869
User & Date: dan 2019-04-24 20:48:55
Context
2019-04-25
16:20
Fix a bug preventing .recover from working on databases where the final page of the db is corrupt. check-in: 959bbd11 user: dan tags: dbdata
2019-04-24
20:48
Improve the performance of the .recover command. check-in: a5076831 user: dan tags: dbdata
2019-04-23
20:48
Have ".recover" handle "\r" and "\n" in the same way as ".dump". check-in: f95f0f02 user: dan tags: dbdata
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to ext/misc/dbdata.c.

102
103
104
105
106
107
108

109
110
111
112
113
114
115
...
163
164
165
166
167
168
169



170

171
172
173
174
175
176
177
...
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214


215













216
217
218
219
220
221
222
223
...
232
233
234
235
236
237
238




239

240
241
242
243
244
245
246
...
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573




574
575
576
577

578
579
580
581
582
583
584
  sqlite3_int64 iIntkey;          /* Integer key value */
};

/* The sqlite_dbdata table */
struct DbdataTable {
  sqlite3_vtab base;              /* Base class.  Must be first */
  sqlite3 *db;                    /* The database connection */

  int bPtr;                       /* True for sqlite3_dbptr table */
};

#define DBDATA_COLUMN_PGNO        0
#define DBDATA_COLUMN_CELL        1
#define DBDATA_COLUMN_FIELD       2
#define DBDATA_COLUMN_VALUE       3
................................................................................
  return rc;
}

/*
** Disconnect from or destroy a dbdata virtual table.
*/
static int dbdataDisconnect(sqlite3_vtab *pVtab){



  sqlite3_free(pVtab);

  return SQLITE_OK;
}

/*
**
** This function interprets two types of constraints:
**
................................................................................
** If neither are present, idxNum is set to 0. If schema=? is present,
** the 0x01 bit in idxNum is set. If pgno=? is present, the 0x02 bit
** in idxNum is set.
**
** If both parameters are present, schema is in position 0 and pgno in
** position 1.
*/
static int dbdataBestIndex(sqlite3_vtab *tab, sqlite3_index_info *pIdxInfo){
  DbdataTable *pTab = (DbdataTable*)tab;
  int i;
  int iSchema = -1;
  int iPgno = -1;
  int colSchema = (pTab->bPtr ? DBPTR_COLUMN_SCHEMA : DBDATA_COLUMN_SCHEMA);

  for(i=0; i<pIdxInfo->nConstraint; i++){
    struct sqlite3_index_constraint *p = &pIdxInfo->aConstraint[i];
    if( p->op==SQLITE_INDEX_CONSTRAINT_EQ ){
      if( p->iColumn==colSchema ){
        if( p->usable==0 ) return SQLITE_CONSTRAINT;
        iSchema = i;
      }
      if( p->iColumn==DBDATA_COLUMN_PGNO && p->usable ){
        iPgno = i;
      }
    }
  }

  if( iSchema>=0 ){
    pIdxInfo->aConstraintUsage[iSchema].argvIndex = 1;
    pIdxInfo->aConstraintUsage[iSchema].omit = 1;
  }
  if( iPgno>=0 ){
    pIdxInfo->aConstraintUsage[iPgno].argvIndex = 1 + (iSchema>=0);
    pIdxInfo->aConstraintUsage[iPgno].omit = 1;


  }













  pIdxInfo->idxNum = (iSchema>=0 ? 0x01 : 0x00) | (iPgno>=0 ? 0x02 : 0x00);
  return SQLITE_OK;
}

/*
** Open a new dbdata cursor.
*/
static int dbdataOpen(sqlite3_vtab *pVTab, sqlite3_vtab_cursor **ppCursor){
................................................................................
  }

  *ppCursor = (sqlite3_vtab_cursor *)pCsr;
  return SQLITE_OK;
}

static void dbdataResetCursor(DbdataCursor *pCsr){




  sqlite3_finalize(pCsr->pStmt);

  pCsr->pStmt = 0;
  pCsr->iPgno = 1;
  pCsr->iCell = 0;
  pCsr->iField = 0;
  pCsr->bOnePage = 0;
}

................................................................................
static int dbdataFilter(
  sqlite3_vtab_cursor *pCursor, 
  int idxNum, const char *idxStr,
  int argc, sqlite3_value **argv
){
  DbdataCursor *pCsr = (DbdataCursor*)pCursor;
  DbdataTable *pTab = (DbdataTable*)pCursor->pVtab;
  int rc;
  const char *zSchema = "main";

  dbdataResetCursor(pCsr);
  assert( pCsr->iPgno==1 );
  if( idxNum & 0x01 ){
    zSchema = (const char*)sqlite3_value_text(argv[0]);
  }
  if( idxNum & 0x02 ){
    pCsr->iPgno = sqlite3_value_int(argv[(idxNum & 0x01)]);
    pCsr->bOnePage = 1;
  }





  rc = sqlite3_prepare_v2(pTab->db, 
      "SELECT data FROM sqlite_dbpage(?) WHERE pgno=?", -1,
      &pCsr->pStmt, 0
  );

  if( rc==SQLITE_OK ){
    rc = sqlite3_bind_text(pCsr->pStmt, 1, zSchema, -1, SQLITE_TRANSIENT);
  }else{
    pTab->base.zErrMsg = sqlite3_mprintf("%s", sqlite3_errmsg(pTab->db));
  }
  if( rc==SQLITE_OK ){
    rc = dbdataNext(pCursor);







>







 







>
>
>
|
>







 







|






|
|












|
|


|
|
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
|







 







>
>
>
>
|
>







 







|












>
>
>
>
|
|
|
|
>







102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
...
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
...
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
...
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
...
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
  sqlite3_int64 iIntkey;          /* Integer key value */
};

/* The sqlite_dbdata table */
struct DbdataTable {
  sqlite3_vtab base;              /* Base class.  Must be first */
  sqlite3 *db;                    /* The database connection */
  sqlite3_stmt *pStmt;            /* For fetching database pages */
  int bPtr;                       /* True for sqlite3_dbptr table */
};

#define DBDATA_COLUMN_PGNO        0
#define DBDATA_COLUMN_CELL        1
#define DBDATA_COLUMN_FIELD       2
#define DBDATA_COLUMN_VALUE       3
................................................................................
  return rc;
}

/*
** Disconnect from or destroy a dbdata virtual table.
*/
static int dbdataDisconnect(sqlite3_vtab *pVtab){
  DbdataTable *pTab = (DbdataTable*)pVtab;
  if( pTab ){
    sqlite3_finalize(pTab->pStmt);
    sqlite3_free(pVtab);
  }
  return SQLITE_OK;
}

/*
**
** This function interprets two types of constraints:
**
................................................................................
** If neither are present, idxNum is set to 0. If schema=? is present,
** the 0x01 bit in idxNum is set. If pgno=? is present, the 0x02 bit
** in idxNum is set.
**
** If both parameters are present, schema is in position 0 and pgno in
** position 1.
*/
static int dbdataBestIndex(sqlite3_vtab *tab, sqlite3_index_info *pIdx){
  DbdataTable *pTab = (DbdataTable*)tab;
  int i;
  int iSchema = -1;
  int iPgno = -1;
  int colSchema = (pTab->bPtr ? DBPTR_COLUMN_SCHEMA : DBDATA_COLUMN_SCHEMA);

  for(i=0; i<pIdx->nConstraint; i++){
    struct sqlite3_index_constraint *p = &pIdx->aConstraint[i];
    if( p->op==SQLITE_INDEX_CONSTRAINT_EQ ){
      if( p->iColumn==colSchema ){
        if( p->usable==0 ) return SQLITE_CONSTRAINT;
        iSchema = i;
      }
      if( p->iColumn==DBDATA_COLUMN_PGNO && p->usable ){
        iPgno = i;
      }
    }
  }

  if( iSchema>=0 ){
    pIdx->aConstraintUsage[iSchema].argvIndex = 1;
    pIdx->aConstraintUsage[iSchema].omit = 1;
  }
  if( iPgno>=0 ){
    pIdx->aConstraintUsage[iPgno].argvIndex = 1 + (iSchema>=0);
    pIdx->aConstraintUsage[iPgno].omit = 1;
    pIdx->estimatedCost = 100;
    pIdx->estimatedRows =  50;

    if( pTab->bPtr==0 && pIdx->nOrderBy && pIdx->aOrderBy[0].desc==0 ){
      int iCol = pIdx->aOrderBy[0].iColumn;
      if( pIdx->nOrderBy==1 ){
        pIdx->orderByConsumed = (iCol==0 || iCol==1);
      }else if( pIdx->nOrderBy==2 && pIdx->aOrderBy[1].desc==0 && iCol==0 ){
        pIdx->orderByConsumed = (pIdx->aOrderBy[1].iColumn==1);
      }
    }

  }else{
    pIdx->estimatedCost = 100000000;
    pIdx->estimatedRows = 1000000000;
  }
  pIdx->idxNum = (iSchema>=0 ? 0x01 : 0x00) | (iPgno>=0 ? 0x02 : 0x00);
  return SQLITE_OK;
}

/*
** Open a new dbdata cursor.
*/
static int dbdataOpen(sqlite3_vtab *pVTab, sqlite3_vtab_cursor **ppCursor){
................................................................................
  }

  *ppCursor = (sqlite3_vtab_cursor *)pCsr;
  return SQLITE_OK;
}

static void dbdataResetCursor(DbdataCursor *pCsr){
  DbdataTable *pTab = (DbdataTable*)(pCsr->base.pVtab);
  if( pTab->pStmt==0 ){
    pTab->pStmt = pCsr->pStmt;
  }else{
    sqlite3_finalize(pCsr->pStmt);
  }
  pCsr->pStmt = 0;
  pCsr->iPgno = 1;
  pCsr->iCell = 0;
  pCsr->iField = 0;
  pCsr->bOnePage = 0;
}

................................................................................
static int dbdataFilter(
  sqlite3_vtab_cursor *pCursor, 
  int idxNum, const char *idxStr,
  int argc, sqlite3_value **argv
){
  DbdataCursor *pCsr = (DbdataCursor*)pCursor;
  DbdataTable *pTab = (DbdataTable*)pCursor->pVtab;
  int rc = SQLITE_OK;
  const char *zSchema = "main";

  dbdataResetCursor(pCsr);
  assert( pCsr->iPgno==1 );
  if( idxNum & 0x01 ){
    zSchema = (const char*)sqlite3_value_text(argv[0]);
  }
  if( idxNum & 0x02 ){
    pCsr->iPgno = sqlite3_value_int(argv[(idxNum & 0x01)]);
    pCsr->bOnePage = 1;
  }

  if( pTab->pStmt ){
    pCsr->pStmt = pTab->pStmt;
    pTab->pStmt = 0;
  }else{
    rc = sqlite3_prepare_v2(pTab->db, 
        "SELECT data FROM sqlite_dbpage(?) WHERE pgno=?", -1,
        &pCsr->pStmt, 0
    );
  }
  if( rc==SQLITE_OK ){
    rc = sqlite3_bind_text(pCsr->pStmt, 1, zSchema, -1, SQLITE_TRANSIENT);
  }else{
    pTab->base.zErrMsg = sqlite3_mprintf("%s", sqlite3_errmsg(pTab->db));
  }
  if( rc==SQLITE_OK ){
    rc = dbdataNext(pCursor);

Changes to src/shell.c.in.

6368
6369
6370
6371
6372
6373
6374


6375
6376
6377
6378
6379
6380
6381
....
6455
6456
6457
6458
6459
6460
6461









6462
6463
6464
6465
6466
6467
6468
....
6471
6472
6473
6474
6475
6476
6477
6478
6479
6480
6481

6482
6483
6484
6485
6486
6487
6488
6489
6490
6491
6492
6493
6494
6495
6496
6497



6498
6499
6500
6501


6502
6503
6504
6505
6506
6507
6508
** This function is called to recover data from the database. A script
** to construct a new database containing all recovered data is output
** on stream pState->out.
*/
static int recoverDatabaseCmd(ShellState *pState){
  int rc = SQLITE_OK;
  sqlite3_stmt *pLoop = 0;        /* Loop through all root pages */



  shellExec(pState->db, &rc, 
    /* Attach an in-memory database named 'recovery'. Create an indexed 
    ** cache of the sqlite_dbptr virtual table. */
    "ATTACH '' AS recovery;"
    "CREATE TABLE recovery.dbptr("
    "      pgno, child, PRIMARY KEY(child, pgno)"
................................................................................
      const char *zCreateTable = (const char*)sqlite3_column_text(pStmt, 0);
      raw_printf(pState->out, "CREATE TABLE IF NOT EXISTS %s;\n", 
          &zCreateTable[12]
      );
    }
    shellFinalize(&rc, pStmt);
  }










  /* Loop through each root page. */
  shellPrepare(pState->db, &rc, 
      "SELECT root, intkey, max(maxlen) FROM recovery.map" 
      " WHERE root>1 GROUP BY root, intkey ORDER BY root=("
      "  SELECT rootpage FROM recovery.schema WHERE name='sqlite_sequence'"
      ")", &pLoop
................................................................................
    int iRoot = sqlite3_column_int(pLoop, 0);
    int bIntkey = sqlite3_column_int(pLoop, 1);
    int nCol = sqlite3_column_int(pLoop, 2);
    RecoverTable *pTab;

    pTab = recoverNewTable(pState, &rc, iRoot, bIntkey, nCol);
    if( pTab ){
      sqlite3_stmt *pData = 0;
      if( 0==sqlite3_stricmp(pTab->zName, "sqlite_sequence") ){
        raw_printf(pState->out, "DELETE FROM sqlite_sequence;\n");
      }

      shellPreparePrintf(pState->db, &rc, &pData, 
        "SELECT max(field), group_concat(shell_escape_crnl(quote(value)),', ')"
        "FROM sqlite_dbdata WHERE pgno IN ("
        "  SELECT pgno FROM recovery.map WHERE root=%d"
        ")"
        " AND field!=%d "
        "GROUP BY pgno, cell;", iRoot, pTab->iPk
      );
      while( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pData) ){
        int iMax = sqlite3_column_int(pData, 0);
        const char *zVal = (const char*)sqlite3_column_text(pData, 1);
        raw_printf(pState->out, "INSERT INTO %s(%s) VALUES( %s );\n", 
            pTab->zQuoted, pTab->azlCol[iMax>0?iMax:0], zVal
        );
      }
      shellFinalize(&rc, pData);



    }
    recoverFreeTable(pTab);
  }
  shellFinalize(&rc, pLoop);



  /* The rest of the schema */
  if( rc==SQLITE_OK ){
    sqlite3_stmt *pStmt = 0;
    shellPrepare(pState->db, &rc, 
        "SELECT sql, name FROM recovery.schema "
        "WHERE sql NOT LIKE 'create table%'", &pStmt







>
>







 







>
>
>
>
>
>
>
>
>







 







<



>
|
|
|
<
<
<
<
<
|
|
|
|
|
|
|
<
>
>
>




>
>







6368
6369
6370
6371
6372
6373
6374
6375
6376
6377
6378
6379
6380
6381
6382
6383
....
6457
6458
6459
6460
6461
6462
6463
6464
6465
6466
6467
6468
6469
6470
6471
6472
6473
6474
6475
6476
6477
6478
6479
....
6482
6483
6484
6485
6486
6487
6488

6489
6490
6491
6492
6493
6494
6495





6496
6497
6498
6499
6500
6501
6502

6503
6504
6505
6506
6507
6508
6509
6510
6511
6512
6513
6514
6515
6516
6517
6518
** This function is called to recover data from the database. A script
** to construct a new database containing all recovered data is output
** on stream pState->out.
*/
static int recoverDatabaseCmd(ShellState *pState){
  int rc = SQLITE_OK;
  sqlite3_stmt *pLoop = 0;        /* Loop through all root pages */
  sqlite3_stmt *pPages = 0;       /* Loop through all pages in a group */
  sqlite3_stmt *pCells = 0;       /* Loop through all cells in a page */

  shellExec(pState->db, &rc, 
    /* Attach an in-memory database named 'recovery'. Create an indexed 
    ** cache of the sqlite_dbptr virtual table. */
    "ATTACH '' AS recovery;"
    "CREATE TABLE recovery.dbptr("
    "      pgno, child, PRIMARY KEY(child, pgno)"
................................................................................
      const char *zCreateTable = (const char*)sqlite3_column_text(pStmt, 0);
      raw_printf(pState->out, "CREATE TABLE IF NOT EXISTS %s;\n", 
          &zCreateTable[12]
      );
    }
    shellFinalize(&rc, pStmt);
  }

  shellPrepare(pState->db, &rc,
      "SELECT pgno FROM recovery.map WHERE root=?", &pPages
  );
  shellPrepare(pState->db, &rc,
      "SELECT max(field), group_concat(shell_escape_crnl(quote(value)), ', ')"
      "FROM sqlite_dbdata WHERE pgno = ? AND field != ?"
      "GROUP BY cell", &pCells
  );

  /* Loop through each root page. */
  shellPrepare(pState->db, &rc, 
      "SELECT root, intkey, max(maxlen) FROM recovery.map" 
      " WHERE root>1 GROUP BY root, intkey ORDER BY root=("
      "  SELECT rootpage FROM recovery.schema WHERE name='sqlite_sequence'"
      ")", &pLoop
................................................................................
    int iRoot = sqlite3_column_int(pLoop, 0);
    int bIntkey = sqlite3_column_int(pLoop, 1);
    int nCol = sqlite3_column_int(pLoop, 2);
    RecoverTable *pTab;

    pTab = recoverNewTable(pState, &rc, iRoot, bIntkey, nCol);
    if( pTab ){

      if( 0==sqlite3_stricmp(pTab->zName, "sqlite_sequence") ){
        raw_printf(pState->out, "DELETE FROM sqlite_sequence;\n");
      }
      sqlite3_bind_int(pPages, 1, iRoot);
      sqlite3_bind_int(pCells, 2, pTab->iPk);
      while( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pPages) ){
        sqlite3_bind_int(pCells, 1, sqlite3_column_int(pPages, 0));





        while( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pCells) ){
          int iMax = sqlite3_column_int(pCells, 0);
          const char *zVal = (const char*)sqlite3_column_text(pCells, 1);
          raw_printf(pState->out, "INSERT INTO %s(%s) VALUES( %s );\n", 
              pTab->zQuoted, pTab->azlCol[iMax>0?iMax:0], zVal
          );
        }

        shellReset(&rc, pCells);
      }
      shellReset(&rc, pPages);
    }
    recoverFreeTable(pTab);
  }
  shellFinalize(&rc, pLoop);
  shellFinalize(&rc, pPages);
  shellFinalize(&rc, pCells);

  /* The rest of the schema */
  if( rc==SQLITE_OK ){
    sqlite3_stmt *pStmt = 0;
    shellPrepare(pState->db, &rc, 
        "SELECT sql, name FROM recovery.schema "
        "WHERE sql NOT LIKE 'create table%'", &pStmt