/ Check-in [cf75cac9]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:More speed improvements. (CVS 1381)
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1:cf75cac9b6bd43e60c6e25042b194ec5c60e5671
User & Date: drh 2004-05-14 19:08:18
Context
2004-05-14
21:12
Cache record headers in the OP_Column opcode. (CVS 1382) check-in: 8d9eab17 user: drh tags: trunk
19:08
More speed improvements. (CVS 1381) check-in: cf75cac9 user: drh tags: trunk
16:50
Optimized varint routines and tests added. (CVS 1380) check-in: d4e0933d user: drh tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/btree.c.

5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
275
276
277
278
279
280
281














282
283
284
285
286
287
288
...
289
290
291
292
293
294
295


296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
...
671
672
673
674
675
676
677




678

679
680
681
682
683
684
685
....
1370
1371
1372
1373
1374
1375
1376

1377
1378
1379
1380
1381
1382
1383
....
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549






1550



1551
1552
1553


1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
....
1670
1671
1672
1673
1674
1675
1676
1677

1678
1679
1680
1681
1682
1683
1684
1685






1686



1687
1688
1689


1690
1691
1692
1693
1694
1695
1696

1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
....
1746
1747
1748
1749
1750
1751
1752

1753
1754
1755
1756
1757
1758
1759
....
1796
1797
1798
1799
1800
1801
1802

1803
1804
1805
1806
1807
1808
1809
....
1843
1844
1845
1846
1847
1848
1849

1850
1851
1852
1853
1854
1855
1856
....
1894
1895
1896
1897
1898
1899
1900

1901
1902
1903
1904
1905
1906
1907
....
1996
1997
1998
1999
2000
2001
2002

2003
2004
2005
2006
2007
2008
2009
....
2048
2049
2050
2051
2052
2053
2054

2055
2056
2057
2058
2059
2060
2061
....
2085
2086
2087
2088
2089
2090
2091

2092
2093
2094
2095
2096
2097
2098
....
2151
2152
2153
2154
2155
2156
2157

2158
2159
2160
2161
2162
2163
2164
....
2608
2609
2610
2611
2612
2613
2614









































2615
2616
2617
2618
2619
2620
2621
....
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817

2818
2819
2820
2821
2822
2823
2824
....
3117
3118
3119
3120
3121
3122
3123

3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
....
3301
3302
3303
3304
3305
3306
3307

3308
3309
3310
3311
3312
3313
3314
** a legal notice, here is a blessing:
**
**    May you do good and not evil.
**    May you find forgiveness for yourself and forgive others.
**    May you share freely, never taking more than you give.
**
*************************************************************************
** $Id: btree.c,v 1.137 2004/05/14 16:50:06 drh Exp $
**
** This file implements a external (disk-based) database using BTrees.
** For a detailed discussion of BTrees, refer to
**
**     Donald E. Knuth, THE ART OF COMPUTER PROGRAMMING, Volume 3:
**     "Sorting And Searching", pages 473-480. Addison-Wesley
**     Publishing Company, Reading, Massachusetts.
................................................................................
  int minLeaf;          /* Minimum local payload in a LEAFDATA table */
  u8 maxEmbedFrac;      /* Maximum payload as % of total page size */
  u8 minEmbedFrac;      /* Minimum payload as % of total page size */
  u8 minLeafFrac;       /* Minimum leaf payload as % of total page size */
};
typedef Btree Bt;















/*
** A cursor is a pointer to a particular entry in the BTree.
** The entry is identified by its MemPage and the index in
** MemPage.aCell[] of the entry.
*/
struct BtCursor {
  Btree *pBt;               /* The Btree to which this cursor belongs */
................................................................................
  BtCursor *pNext, *pPrev;  /* Forms a linked list of all cursors */
  BtCursor *pShared;        /* Loop of cursors with the same root page */
  int (*xCompare)(void*,int,const void*,int,const void*); /* Key comp func */
  void *pArg;               /* First arg to xCompare() */
  Pgno pgnoRoot;            /* The root page of this tree */
  MemPage *pPage;           /* Page that contains the entry */
  int idx;                  /* Index of the entry in pPage->aCell[] */


  u8 wrFlag;                /* True if writable */
  u8 iMatch;                /* compare result from last sqlite3BtreeMoveto() */
  u8 isValid;               /* TRUE if points to a valid entry */
  u8 status;                /* Set to SQLITE_ABORT if cursors is invalidated */
};

/*
** An instance of the following structure is used to hold information
** about a cell.  The parseCell() function fills the structure in.
*/
typedef struct CellInfo CellInfo;
struct CellInfo {
  i64 nKey;      /* The key for INTKEY tables, or number of bytes in key */
  u32 nData;     /* Number of bytes of data */
  int nHeader;   /* Size of the header in bytes */
  int nLocal;    /* Amount of payload held locally */
  int iOverflow; /* Offset to overflow page number.  Zero if none */
  int nSize;     /* Size of the cell */
};

/*
** Read or write a two-, four-, and eight-byte big-endian integer values.
*/
static u32 get2byte(unsigned char *p){
  return (p[0]<<8) | p[1];
}
static u32 get4byte(unsigned char *p){
................................................................................
** Resize the aCell[] array of the given page so that it is able to
** hold at least nNewSz entries.
**
** Return SQLITE_OK or SQLITE_NOMEM.
*/
static int resizeCellArray(MemPage *pPage, int nNewSz){
  if( pPage->nCellAlloc<nNewSz ){




    pPage->aCell = sqliteRealloc(pPage->aCell, nNewSz*sizeof(pPage->aCell[0]) );

    if( sqlite3_malloc_failed ) return SQLITE_NOMEM;
    pPage->nCellAlloc = nNewSz;
  }
  return SQLITE_OK;
}

/*
................................................................................
    goto create_cursor_exception;
  }
  pCur->xCompare = xCmp ? xCmp : dfltCompare;
  pCur->pArg = pArg;
  pCur->pBt = pBt;
  pCur->wrFlag = wrFlag;
  pCur->idx = 0;

  pCur->pNext = pBt->pCursor;
  if( pCur->pNext ){
    pCur->pNext->pPrev = pCur;
  }
  pCur->pPrev = 0;
  pRing = pBt->pCursor;
  while( pRing && pRing->pgnoRoot!=pCur->pgnoRoot ){ pRing = pRing->pNext; }
................................................................................
){
  unsigned char *aPayload;
  Pgno nextPage;
  int rc;
  MemPage *pPage;
  Btree *pBt;
  int ovflSize;
  CellInfo info;

  assert( pCur!=0 && pCur->pPage!=0 );
  assert( pCur->isValid );
  pBt = pCur->pBt;
  pPage = pCur->pPage;
  pageIntegrity(pPage);
  assert( pCur->idx>=0 && pCur->idx<pPage->nCell );
  aPayload = pPage->aCell[pCur->idx];






  parseCell(pPage, aPayload, &info);



  aPayload += info.nHeader;
  if( pPage->intKey ){
    info.nKey = 0;


  }
  assert( offset>=0 );
  if( skipKey ){
    offset += info.nKey;
  }
  if( offset+amt > info.nKey+info.nData ){
    return SQLITE_ERROR;
  }
  if( offset<info.nLocal ){
    int a = amt;
    if( a+offset>info.nLocal ){
      a = info.nLocal - offset;
    }
    memcpy(pBuf, &aPayload[offset], a);
    if( a==amt ){
      return SQLITE_OK;
    }
    offset = 0;
    pBuf += a;
    amt -= a;
  }else{
    offset -= info.nLocal;
  }
  if( amt>0 ){
    nextPage = get4byte(&aPayload[info.nLocal]);
  }
  ovflSize = pBt->usableSize - 4;
  while( amt>0 && nextPage ){
    rc = sqlite3pager_get(pBt->pPager, nextPage, (void**)&aPayload);
    if( rc!=0 ){
      return rc;
    }
................................................................................
  BtCursor *pCur,      /* Cursor pointing to entry to read from */
  int amt,             /* Amount requested */
  int skipKey          /* read beginning at data if this is true */
){
  unsigned char *aPayload;
  MemPage *pPage;
  Btree *pBt;
  CellInfo info;


  assert( pCur!=0 && pCur->pPage!=0 );
  assert( pCur->isValid );
  pBt = pCur->pBt;
  pPage = pCur->pPage;
  pageIntegrity(pPage);
  assert( pCur->idx>=0 && pCur->idx<pPage->nCell );
  aPayload = pPage->aCell[pCur->idx];






  parseCell(pPage, aPayload, &info);



  aPayload += info.nHeader;
  if( pPage->intKey ){
    info.nKey = 0;


  }
  if( skipKey ){
    aPayload += info.nKey;
    info.nLocal -= info.nKey;
    if( amt<0 ) amt = info.nData;
    assert( amt<=info.nData );
  }else{

    if( amt<0 ) amt = info.nKey;
    assert( amt<=info.nKey );
  }
  if( amt>info.nLocal ){
    return 0;  /* If any of the data is not local, return nothing */
  }
  return aPayload;
}


/*
................................................................................
  pageIntegrity(pNewPage);
  pNewPage->idxParent = pCur->idx;
  pOldPage = pCur->pPage;
  pOldPage->idxShift = 0;
  releasePage(pOldPage);
  pCur->pPage = pNewPage;
  pCur->idx = 0;

  if( pNewPage->nCell<1 ){
    return SQLITE_CORRUPT;
  }
  return SQLITE_OK;
}

/*
................................................................................
  assert( pParent!=0 );
  pageIntegrity(pParent);
  idxParent = pPage->idxParent;
  sqlite3pager_ref(pParent->aData);
  oldPgno = pPage->pgno;
  releasePage(pPage);
  pCur->pPage = pParent;

  assert( pParent->idxShift==0 );
  if( pParent->idxShift==0 ){
    pCur->idx = idxParent;
#ifndef NDEBUG  
    /* Verify that pCur->idx is the correct index to point back to the child
    ** page we just came from 
    */
................................................................................
    pCur->isValid = 0;
    return rc;
  }
  releasePage(pCur->pPage);
  pageIntegrity(pRoot);
  pCur->pPage = pRoot;
  pCur->idx = 0;

  if( pRoot->nCell==0 && !pRoot->leaf ){
    Pgno subpage;
    assert( pRoot->pgno==1 );
    subpage = get4byte(&pRoot->aData[pRoot->hdrOffset+6]);
    assert( subpage>0 );
    pCur->isValid = 1;
    rc = moveToChild(pCur, subpage);
................................................................................
  while( !(pPage = pCur->pPage)->leaf ){
    pgno = get4byte(&pPage->aData[pPage->hdrOffset+6]);
    pCur->idx = pPage->nCell;
    rc = moveToChild(pCur, pgno);
    if( rc ) return rc;
  }
  pCur->idx = pPage->nCell - 1;

  return SQLITE_OK;
}

/* Move the cursor to the first entry in the table.  Return SQLITE_OK
** on success.  Set *pRes to 0 if the cursor actually points to something
** or set *pRes to 1 if the table is empty.
*/
................................................................................
    lwr = 0;
    upr = pPage->nCell-1;
    pageIntegrity(pPage);
    while( lwr<=upr ){
      const void *pCellKey;
      i64 nCellKey;
      pCur->idx = (lwr+upr)/2;

      sqlite3BtreeKeySize(pCur, &nCellKey);
      if( pPage->intKey ){
        if( nCellKey<nKey ){
          c = -1;
        }else if( nCellKey>nKey ){
          c = +1;
        }else{
................................................................................
    if( chldPg==0 ){
      pCur->iMatch = c;
      assert( pCur->idx>=0 && pCur->idx<pCur->pPage->nCell );
      if( pRes ) *pRes = c;
      return SQLITE_OK;
    }
    pCur->idx = lwr;

    rc = moveToChild(pCur, chldPg);
    if( rc ){
      return rc;
    }
  }
  /* NOT REACHED */
}
................................................................................
  if( pCur->isValid==0 ){
    *pRes = 1;
    return SQLITE_OK;
  }
  assert( pPage->isInit );
  assert( pCur->idx<pPage->nCell );
  pCur->idx++;

  if( pCur->idx>=pPage->nCell ){
    if( !pPage->leaf ){
      rc = moveToChild(pCur, get4byte(&pPage->aData[pPage->hdrOffset+6]));
      if( rc ) return rc;
      rc = moveToLeftmost(pCur);
      *pRes = 0;
      return rc;
................................................................................
        *pRes = 1;
        return SQLITE_OK;
      }
      moveToParent(pCur);
      pPage = pCur->pPage;
    }
    pCur->idx--;

    if( pPage->leafData ){
      rc = sqlite3BtreePrevious(pCur, pRes);
    }else{
      rc = SQLITE_OK;
    }
  }
  *pRes = 0;
................................................................................
    put2byte(pPage->aCell[i], pc);
    pageIntegrity(pPage);
  }else{
    pPage->needRelink = 1;
  }
  pPage->idxShift = 1;
}










































/*
** Rebuild the linked list of cells on a page so that the cells
** occur in the order specified by the pPage->aCell[] array.  
** Invoke this routine once to repair damage after one or more
** invocations of either insertCell() or dropCell().
*/
................................................................................
        if( pPage->pgno==1 ){
          rc = initPage(pChild, pPage);
          if( rc ) return rc;
          if( pChild->nFree>=100 ){
            /* The child information will fit on the root page, so do the
            ** copy */
            zeroPage(pPage, pChild->aData[0]);
            resizeCellArray(pPage, pChild->nCell);
            for(i=0; i<pChild->nCell; i++){
              insertCell(pPage, i, pChild->aCell[i], 
                        cellSize(pChild, pChild->aCell[i]), 0);
            }

            freePage(pChild);
            TRACE(("BALANCE: child %d transfer to page 1\n", pChild->pgno));
          }else{
            /* The child has more information that will fit on the root.
            ** The tree is already balanced.  Do nothing. */
            TRACE(("BALANCE: child %d will not fit on page 1\n", pChild->pgno));
          }
................................................................................
  ** Insert divider cells into pParent as necessary.
  */
  j = 0;
  for(i=0; i<nNew; i++){
    MemPage *pNew = apNew[i];
    assert( pNew->pgno==pgnoNew[i] );
    resizeCellArray(pNew, cntNew[i] - j);

    while( j<cntNew[i] ){
      assert( pNew->nFree>=szCell[j] );
      insertCell(pNew, pNew->nCell, apCell[j], szCell[j], 0);
      j++;
    }
    assert( pNew->nCell>0 );
    assert( !pNew->isOverfull );
    relinkCellList(pNew);
    if( i<nNew-1 && j<nCell ){
      u8 *pCell;
      u8 *pTemp;
      int sz;
................................................................................
    szOld = cellSize(pPage, oldCell);
    rc = clearCell(pPage, oldCell);
    if( rc ) return rc;
    dropCell(pPage, pCur->idx, szOld);
  }else if( loc<0 && pPage->nCell>0 ){
    assert( pPage->leaf );
    pCur->idx++;

  }else{
    assert( pPage->leaf );
  }
  insertCell(pPage, pCur->idx, newCell, szNew, 0);
  rc = balance(pPage);
  /* sqlite3BtreePageDump(pCur->pBt, pCur->pgnoRoot, 1); */
  /* fflush(stdout); */







|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>






<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







>
>
>
>
|
>







 







>







 







|








>
>
>
>
>
>
|
>
>
>
|

|
>
>



|

|


|

|
|









|


|







 







|
>








>
>
>
>
>
>
|
>
>
>
|

|
>
>


|
|
|
|

>
|
|

|







 







>







 







>







 







>







 







>







 







>







 







>







 







>







 







>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







<

<
|

>







 







>
|
<
<
<
<







 







>







5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
...
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317














318
319
320
321
322
323
324
...
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
....
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
....
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
....
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
....
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
....
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
....
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
....
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
....
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
....
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
....
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
....
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
....
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
....
2887
2888
2889
2890
2891
2892
2893

2894

2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
....
3197
3198
3199
3200
3201
3202
3203
3204
3205




3206
3207
3208
3209
3210
3211
3212
....
3378
3379
3380
3381
3382
3383
3384
3385
3386
3387
3388
3389
3390
3391
3392
** a legal notice, here is a blessing:
**
**    May you do good and not evil.
**    May you find forgiveness for yourself and forgive others.
**    May you share freely, never taking more than you give.
**
*************************************************************************
** $Id: btree.c,v 1.138 2004/05/14 19:08:18 drh Exp $
**
** This file implements a external (disk-based) database using BTrees.
** For a detailed discussion of BTrees, refer to
**
**     Donald E. Knuth, THE ART OF COMPUTER PROGRAMMING, Volume 3:
**     "Sorting And Searching", pages 473-480. Addison-Wesley
**     Publishing Company, Reading, Massachusetts.
................................................................................
  int minLeaf;          /* Minimum local payload in a LEAFDATA table */
  u8 maxEmbedFrac;      /* Maximum payload as % of total page size */
  u8 minEmbedFrac;      /* Minimum payload as % of total page size */
  u8 minLeafFrac;       /* Minimum leaf payload as % of total page size */
};
typedef Btree Bt;

/*
** An instance of the following structure is used to hold information
** about a cell.  The parseCell() function fills the structure in.
*/
typedef struct CellInfo CellInfo;
struct CellInfo {
  i64 nKey;      /* The key for INTKEY tables, or number of bytes in key */
  u32 nData;     /* Number of bytes of data */
  u16 nHeader;   /* Size of the header in bytes */
  u16 nLocal;    /* Amount of payload held locally */
  u16 iOverflow; /* Offset to overflow page number.  Zero if none */
  u16 nSize;     /* Size of the cell */
};

/*
** A cursor is a pointer to a particular entry in the BTree.
** The entry is identified by its MemPage and the index in
** MemPage.aCell[] of the entry.
*/
struct BtCursor {
  Btree *pBt;               /* The Btree to which this cursor belongs */
................................................................................
  BtCursor *pNext, *pPrev;  /* Forms a linked list of all cursors */
  BtCursor *pShared;        /* Loop of cursors with the same root page */
  int (*xCompare)(void*,int,const void*,int,const void*); /* Key comp func */
  void *pArg;               /* First arg to xCompare() */
  Pgno pgnoRoot;            /* The root page of this tree */
  MemPage *pPage;           /* Page that contains the entry */
  int idx;                  /* Index of the entry in pPage->aCell[] */
  CellInfo info;            /* A parse of the cell we are pointing at */
  u8 infoValid;             /* True if information in BtCursor.info is valid */
  u8 wrFlag;                /* True if writable */
  u8 iMatch;                /* compare result from last sqlite3BtreeMoveto() */
  u8 isValid;               /* TRUE if points to a valid entry */
  u8 status;                /* Set to SQLITE_ABORT if cursors is invalidated */
};















/*
** Read or write a two-, four-, and eight-byte big-endian integer values.
*/
static u32 get2byte(unsigned char *p){
  return (p[0]<<8) | p[1];
}
static u32 get4byte(unsigned char *p){
................................................................................
** Resize the aCell[] array of the given page so that it is able to
** hold at least nNewSz entries.
**
** Return SQLITE_OK or SQLITE_NOMEM.
*/
static int resizeCellArray(MemPage *pPage, int nNewSz){
  if( pPage->nCellAlloc<nNewSz ){
    int n = nNewSz*sizeof(pPage->aCell[0]);
    if( pPage->aCell==0 ){
      pPage->aCell = sqliteMallocRaw( n );
    }else{
      pPage->aCell = sqliteRealloc(pPage->aCell, n);
    }
    if( sqlite3_malloc_failed ) return SQLITE_NOMEM;
    pPage->nCellAlloc = nNewSz;
  }
  return SQLITE_OK;
}

/*
................................................................................
    goto create_cursor_exception;
  }
  pCur->xCompare = xCmp ? xCmp : dfltCompare;
  pCur->pArg = pArg;
  pCur->pBt = pBt;
  pCur->wrFlag = wrFlag;
  pCur->idx = 0;
  pCur->infoValid = 0;
  pCur->pNext = pBt->pCursor;
  if( pCur->pNext ){
    pCur->pNext->pPrev = pCur;
  }
  pCur->pPrev = 0;
  pRing = pBt->pCursor;
  while( pRing && pRing->pgnoRoot!=pCur->pgnoRoot ){ pRing = pRing->pNext; }
................................................................................
){
  unsigned char *aPayload;
  Pgno nextPage;
  int rc;
  MemPage *pPage;
  Btree *pBt;
  int ovflSize;
  u32 nKey;

  assert( pCur!=0 && pCur->pPage!=0 );
  assert( pCur->isValid );
  pBt = pCur->pBt;
  pPage = pCur->pPage;
  pageIntegrity(pPage);
  assert( pCur->idx>=0 && pCur->idx<pPage->nCell );
  aPayload = pPage->aCell[pCur->idx];
  if( !pCur->infoValid ){
    parseCell(pPage, aPayload, &pCur->info);
    pCur->infoValid = 1;
  }else{
#ifndef NDEBUG
    CellInfo info;
    parseCell(pPage, aPayload, &info);
    assert( memcmp(&info, &pCur->info, sizeof(info))==0 );
#endif
  }
  aPayload += pCur->info.nHeader;
  if( pPage->intKey ){
    nKey = 0;
  }else{
    nKey = pCur->info.nKey;
  }
  assert( offset>=0 );
  if( skipKey ){
    offset += nKey;
  }
  if( offset+amt > nKey+pCur->info.nData ){
    return SQLITE_ERROR;
  }
  if( offset<pCur->info.nLocal ){
    int a = amt;
    if( a+offset>pCur->info.nLocal ){
      a = pCur->info.nLocal - offset;
    }
    memcpy(pBuf, &aPayload[offset], a);
    if( a==amt ){
      return SQLITE_OK;
    }
    offset = 0;
    pBuf += a;
    amt -= a;
  }else{
    offset -= pCur->info.nLocal;
  }
  if( amt>0 ){
    nextPage = get4byte(&aPayload[pCur->info.nLocal]);
  }
  ovflSize = pBt->usableSize - 4;
  while( amt>0 && nextPage ){
    rc = sqlite3pager_get(pBt->pPager, nextPage, (void**)&aPayload);
    if( rc!=0 ){
      return rc;
    }
................................................................................
  BtCursor *pCur,      /* Cursor pointing to entry to read from */
  int amt,             /* Amount requested */
  int skipKey          /* read beginning at data if this is true */
){
  unsigned char *aPayload;
  MemPage *pPage;
  Btree *pBt;
  u32 nKey;
  int nLocal;

  assert( pCur!=0 && pCur->pPage!=0 );
  assert( pCur->isValid );
  pBt = pCur->pBt;
  pPage = pCur->pPage;
  pageIntegrity(pPage);
  assert( pCur->idx>=0 && pCur->idx<pPage->nCell );
  aPayload = pPage->aCell[pCur->idx];
  if( !pCur->infoValid ){
    parseCell(pPage, aPayload, &pCur->info);
    pCur->infoValid = 1;
  }else{
#ifndef NDEBUG
    CellInfo info;
    parseCell(pPage, aPayload, &info);
    assert( memcmp(&info, &pCur->info, sizeof(info))==0 );
#endif
  }
  aPayload += pCur->info.nHeader;
  if( pPage->intKey ){
    nKey = 0;
  }else{
    nKey = pCur->info.nKey;
  }
  if( skipKey ){
    aPayload += nKey;
    nLocal = pCur->info.nLocal - nKey;
    if( amt<0 ) amt = pCur->info.nData;
    assert( amt<=pCur->info.nData );
  }else{
    nLocal = pCur->info.nLocal;
    if( amt<0 ) amt = nKey;
    assert( amt<=nKey );
  }
  if( amt>nLocal ){
    return 0;  /* If any of the data is not local, return nothing */
  }
  return aPayload;
}


/*
................................................................................
  pageIntegrity(pNewPage);
  pNewPage->idxParent = pCur->idx;
  pOldPage = pCur->pPage;
  pOldPage->idxShift = 0;
  releasePage(pOldPage);
  pCur->pPage = pNewPage;
  pCur->idx = 0;
  pCur->infoValid = 0;
  if( pNewPage->nCell<1 ){
    return SQLITE_CORRUPT;
  }
  return SQLITE_OK;
}

/*
................................................................................
  assert( pParent!=0 );
  pageIntegrity(pParent);
  idxParent = pPage->idxParent;
  sqlite3pager_ref(pParent->aData);
  oldPgno = pPage->pgno;
  releasePage(pPage);
  pCur->pPage = pParent;
  pCur->infoValid = 0;
  assert( pParent->idxShift==0 );
  if( pParent->idxShift==0 ){
    pCur->idx = idxParent;
#ifndef NDEBUG  
    /* Verify that pCur->idx is the correct index to point back to the child
    ** page we just came from 
    */
................................................................................
    pCur->isValid = 0;
    return rc;
  }
  releasePage(pCur->pPage);
  pageIntegrity(pRoot);
  pCur->pPage = pRoot;
  pCur->idx = 0;
  pCur->infoValid = 0;
  if( pRoot->nCell==0 && !pRoot->leaf ){
    Pgno subpage;
    assert( pRoot->pgno==1 );
    subpage = get4byte(&pRoot->aData[pRoot->hdrOffset+6]);
    assert( subpage>0 );
    pCur->isValid = 1;
    rc = moveToChild(pCur, subpage);
................................................................................
  while( !(pPage = pCur->pPage)->leaf ){
    pgno = get4byte(&pPage->aData[pPage->hdrOffset+6]);
    pCur->idx = pPage->nCell;
    rc = moveToChild(pCur, pgno);
    if( rc ) return rc;
  }
  pCur->idx = pPage->nCell - 1;
  pCur->infoValid = 0;
  return SQLITE_OK;
}

/* Move the cursor to the first entry in the table.  Return SQLITE_OK
** on success.  Set *pRes to 0 if the cursor actually points to something
** or set *pRes to 1 if the table is empty.
*/
................................................................................
    lwr = 0;
    upr = pPage->nCell-1;
    pageIntegrity(pPage);
    while( lwr<=upr ){
      const void *pCellKey;
      i64 nCellKey;
      pCur->idx = (lwr+upr)/2;
      pCur->infoValid = 0;
      sqlite3BtreeKeySize(pCur, &nCellKey);
      if( pPage->intKey ){
        if( nCellKey<nKey ){
          c = -1;
        }else if( nCellKey>nKey ){
          c = +1;
        }else{
................................................................................
    if( chldPg==0 ){
      pCur->iMatch = c;
      assert( pCur->idx>=0 && pCur->idx<pCur->pPage->nCell );
      if( pRes ) *pRes = c;
      return SQLITE_OK;
    }
    pCur->idx = lwr;
    pCur->infoValid = 0;
    rc = moveToChild(pCur, chldPg);
    if( rc ){
      return rc;
    }
  }
  /* NOT REACHED */
}
................................................................................
  if( pCur->isValid==0 ){
    *pRes = 1;
    return SQLITE_OK;
  }
  assert( pPage->isInit );
  assert( pCur->idx<pPage->nCell );
  pCur->idx++;
  pCur->infoValid = 0;
  if( pCur->idx>=pPage->nCell ){
    if( !pPage->leaf ){
      rc = moveToChild(pCur, get4byte(&pPage->aData[pPage->hdrOffset+6]));
      if( rc ) return rc;
      rc = moveToLeftmost(pCur);
      *pRes = 0;
      return rc;
................................................................................
        *pRes = 1;
        return SQLITE_OK;
      }
      moveToParent(pCur);
      pPage = pCur->pPage;
    }
    pCur->idx--;
    pCur->infoValid = 0;
    if( pPage->leafData ){
      rc = sqlite3BtreePrevious(pCur, pRes);
    }else{
      rc = SQLITE_OK;
    }
  }
  *pRes = 0;
................................................................................
    put2byte(pPage->aCell[i], pc);
    pageIntegrity(pPage);
  }else{
    pPage->needRelink = 1;
  }
  pPage->idxShift = 1;
}

/*
** Add a list of cells to a page.  The page should be initially empty.
** The cells are guaranteed to fit on the page.
*/
static void assemblePage(
  MemPage *pPage,   /* The page to be assemblied */
  int nCell,        /* The number of cells to add to this page */
  u8 **apCell,      /* Pointers to cell text */
  int *aSize        /* Sizes of the cells */
){
  int i;            /* Loop counter */
  int totalSize;    /* Total size of all cells */
  int hdr;          /* Index of page header */
  int pc, prevpc;   /* Addresses of cells being inserted */
  u8 *data;         /* Data for the page */

  assert( pPage->needRelink==0 );
  assert( pPage->isOverfull==0 );
  totalSize = 0;
  for(i=0; i<nCell; i++){
    totalSize += aSize[i];
  }
  assert( totalSize<=pPage->nFree );
  assert( pPage->nCell==0 );
  resizeCellArray(pPage, nCell);
  pc = allocateSpace(pPage, totalSize);
  data = pPage->aData;
  hdr = pPage->hdrOffset;
  prevpc = hdr+3;
  for(i=0; i<nCell; i++){
    memcpy(data+pc, apCell[i], aSize[i]);
    put2byte(data+prevpc, pc);
    pPage->aCell[i] = data+pc;
    prevpc = pc;
    pc += aSize[i];
    assert( pc<=pPage->pBt->usableSize );
  }
  pPage->nCell = nCell;
  put2byte(data+prevpc, 0);
}

/*
** Rebuild the linked list of cells on a page so that the cells
** occur in the order specified by the pPage->aCell[] array.  
** Invoke this routine once to repair damage after one or more
** invocations of either insertCell() or dropCell().
*/
................................................................................
        if( pPage->pgno==1 ){
          rc = initPage(pChild, pPage);
          if( rc ) return rc;
          if( pChild->nFree>=100 ){
            /* The child information will fit on the root page, so do the
            ** copy */
            zeroPage(pPage, pChild->aData[0]);

            for(i=0; i<pChild->nCell; i++){

              szCell[i]  = cellSize(pChild, pChild->aCell[i]);
            }
            assemblePage(pPage, pChild->nCell, pChild->aCell, szCell);
            freePage(pChild);
            TRACE(("BALANCE: child %d transfer to page 1\n", pChild->pgno));
          }else{
            /* The child has more information that will fit on the root.
            ** The tree is already balanced.  Do nothing. */
            TRACE(("BALANCE: child %d will not fit on page 1\n", pChild->pgno));
          }
................................................................................
  ** Insert divider cells into pParent as necessary.
  */
  j = 0;
  for(i=0; i<nNew; i++){
    MemPage *pNew = apNew[i];
    assert( pNew->pgno==pgnoNew[i] );
    resizeCellArray(pNew, cntNew[i] - j);
    assemblePage(pNew, cntNew[i]-j, &apCell[j], &szCell[j]);
    j = cntNew[i];




    assert( pNew->nCell>0 );
    assert( !pNew->isOverfull );
    relinkCellList(pNew);
    if( i<nNew-1 && j<nCell ){
      u8 *pCell;
      u8 *pTemp;
      int sz;
................................................................................
    szOld = cellSize(pPage, oldCell);
    rc = clearCell(pPage, oldCell);
    if( rc ) return rc;
    dropCell(pPage, pCur->idx, szOld);
  }else if( loc<0 && pPage->nCell>0 ){
    assert( pPage->leaf );
    pCur->idx++;
    pCur->infoValid = 0;
  }else{
    assert( pPage->leaf );
  }
  insertCell(pPage, pCur->idx, newCell, szNew, 0);
  rc = balance(pPage);
  /* sqlite3BtreePageDump(pCur->pBt, pCur->pgnoRoot, 1); */
  /* fflush(stdout); */

Changes to src/vdbeaux.c.

1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
    pMem->flags = MEM_Str;
  }else{
    pMem->flags = MEM_Blob;
  }
  len = sqlite3VdbeSerialTypeLen(serial_type);
  pMem->n = len;
  if( len>NBFS ){
    pMem->z = sqliteMalloc( len );
    if( !pMem->z ){
      return -1;
    }
    pMem->flags |= MEM_Dyn;
  }else{
    pMem->z = pMem->zShort;
    pMem->flags |= MEM_Short;







|







1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
    pMem->flags = MEM_Str;
  }else{
    pMem->flags = MEM_Blob;
  }
  len = sqlite3VdbeSerialTypeLen(serial_type);
  pMem->n = len;
  if( len>NBFS ){
    pMem->z = sqliteMallocRaw( len );
    if( !pMem->z ){
      return -1;
    }
    pMem->flags |= MEM_Dyn;
  }else{
    pMem->z = pMem->zShort;
    pMem->flags |= MEM_Short;