/ Check-in [45a0e0fc]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment::-) (CVS 220)
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1:45a0e0fc8ccde52ac409d1271beaef779fa7eeee
User & Date: drh 2001-05-26 13:15:44
Context
2001-05-28
00:41
:-) (CVS 221) check-in: c8d3bdd9 user: drh tags: trunk
2001-05-26
13:15
:-) (CVS 220) check-in: 45a0e0fc user: drh tags: trunk
2001-05-24
21:06
Continued work on btree (CVS 219) check-in: 18500cdc user: drh tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/btree.c.

    17     17   ** Boston, MA  02111-1307, USA.
    18     18   **
    19     19   ** Author contact information:
    20     20   **   drh@hwaci.com
    21     21   **   http://www.hwaci.com/drh/
    22     22   **
    23     23   *************************************************************************
    24         -** $Id: btree.c,v 1.7 2001/05/24 21:06:35 drh Exp $
           24  +** $Id: btree.c,v 1.8 2001/05/26 13:15:44 drh Exp $
    25     25   */
    26     26   #include "sqliteInt.h"
    27     27   #include "pager.h"
    28     28   #include "btree.h"
    29     29   #include <assert.h>
    30     30   
    31         -/*
    32         -** The maximum number of database entries that can be held in a single
    33         -** page of the database. 
    34         -*/
    35         -#define MX_CELL ((SQLITE_PAGE_SIZE-sizeof(PageHdr))/sizeof(Cell))
    36         -
    37         -/*
    38         -** The maximum amount of data (in bytes) that can be stored locally for a
    39         -** database entry.  If the entry contains more data than this, the
    40         -** extra goes onto overflow pages.
    41         -*/
    42         -#define MX_LOCAL_PAYLOAD \
    43         -  ((SQLITE_PAGE_SIZE-sizeof(PageHdr)-4*(sizeof(Cell)+sizeof(Pgno)))/4)
    44         -
    45         -/*
    46         -** The in-memory image of a disk page has the auxiliary information appended
    47         -** to the end.  EXTRA_SIZE is the number of bytes of space needed to hold
    48         -** that extra information.
    49         -*/
    50         -#define EXTRA_SIZE (sizeof(MemPage)-SQLITE_PAGE_SIZE)
    51         -
    52         -/*
    53         -** Number of bytes on a single overflow page.
    54         -*/
    55         -#define OVERFLOW_SIZE (SQLITE_PAGE_SIZE-sizeof(Pgno))
    56     31   
    57     32   /*
    58     33   ** Primitive data types.  u32 must be 4 bytes and u16 must be 2 bytes.
    59     34   ** Change these typedefs when porting to new architectures.
    60     35   */
    61     36   typedef unsigned int u32;
    62     37   typedef unsigned short int u16;
................................................................................
    64     39   /*
    65     40   ** Forward declarations of structures used only in this file.
    66     41   */
    67     42   typedef struct Page1Header Page1Header;
    68     43   typedef struct MemPage MemPage;
    69     44   typedef struct PageHdr PageHdr;
    70     45   typedef struct Cell Cell;
           46  +typedef struct CellHdr CellHdr;
    71     47   typedef struct FreeBlk FreeBlk;
    72     48   typedef struct OverflowPage OverflowPage;
    73     49   
    74     50   /*
    75     51   ** All structures on a database page are aligned to 4-byte boundries.
    76     52   ** This routine rounds up a number of bytes to the next multiple of 4.
    77     53   **
................................................................................
   114     90   */
   115     91   struct PageHdr {
   116     92     Pgno pgno;      /* Child page that comes after all cells on this page */
   117     93     u16 firstCell;  /* Index in MemPage.aPage[] of the first cell */
   118     94     u16 firstFree;  /* Index in MemPage.aPage[] of the first free block */
   119     95   };
   120     96   
           97  +
           98  +/*
           99  +** Entries on a page of the database are called "Cells".  Each Cell
          100  +** has a header and data.  This structure defines the header.  The
          101  +** definition of the complete Cell including the data is given below.
          102  +*/
          103  +struct CellHdr {
          104  +  Pgno pgno;      /* Child page that comes before this cell */
          105  +  u16 nKey;       /* Number of bytes in the key */
          106  +  u16 iNext;      /* Index in MemPage.aPage[] of next cell in sorted order */
          107  +  u32 nData;      /* Number of bytes of data */
          108  +}
          109  +
          110  +/*
          111  +** The minimum size of a complete Cell.  The Cell must contain a header
          112  +** and at least 4 bytes of data.
          113  +*/
          114  +#define MIN_CELL_SIZE  (sizeof(CellHdr)+4)
          115  +
          116  +/*
          117  +** The maximum number of database entries that can be held in a single
          118  +** page of the database. 
          119  +*/
          120  +#define MX_CELL ((SQLITE_PAGE_SIZE-sizeof(PageHdr))/MIN_CELL_SIZE)
          121  +
          122  +/*
          123  +** The maximum amount of data (in bytes) that can be stored locally for a
          124  +** database entry.  If the entry contains more data than this, the
          125  +** extra goes onto overflow pages.
          126  +*/
          127  +#define MX_LOCAL_PAYLOAD \
          128  +  ((SQLITE_PAGE_SIZE-sizeof(PageHdr))/4-(sizeof(CellHdr)+sizeof(Pgno)))
          129  +
   121    130   /*
   122    131   ** Data on a database page is stored as a linked list of Cell structures.
   123    132   ** Both the key and the data are stored in aData[].  The key always comes
   124    133   ** first.  The aData[] field grows as necessary to hold the key and data,
   125    134   ** up to a maximum of MX_LOCAL_PAYLOAD bytes.  If the size of the key and
   126         -** data combined exceeds MX_LOCAL_PAYLOAD bytes, then the 4 bytes beginning
   127         -** at Cell.aData[MX_LOCAL_PAYLOAD] are the page number of the first overflow
   128         -** page.
          135  +** data combined exceeds MX_LOCAL_PAYLOAD bytes, then Cell.ovfl is the
          136  +** page number of the first overflow page.
          137  +**
          138  +** Though this structure is fixed in size, the Cell on the database
          139  +** page varies in size.  Very cell has a CellHdr and at least 4 bytes
          140  +** of payload space.  Additional payload bytes (up to the maximum of
          141  +** MX_LOCAL_PAYLOAD) and the Cell.ovfl value are allocated only as
          142  +** needed.
   129    143   */
   130    144   struct Cell {
   131         -  Pgno pgno;      /* Child page that comes before this cell */
   132         -  u16 nKey;       /* Number of bytes in the key */
   133         -  u16 iNext;      /* Index in MemPage.aPage[] of next cell in sorted order */
   134         -  u32 nData;      /* Number of bytes of data */
   135         -  char aData[4];  /* Key and data */
          145  +  CellHdr h;                     /* The cell header */
          146  +  char aData[MX_LOCAL_PAYLOAD];  /* Key and data */
          147  +  Pgno ovfl;                     /* The first overflow page */
   136    148   };
   137    149   
   138    150   /*
   139    151   ** Free space on a page is remembered using a linked list of the FreeBlk
   140    152   ** structures.  Space on a database page is allocated in increments of
   141    153   ** at least 4 bytes and is always aligned to a 4-byte boundry.  The
   142    154   ** linked list of freeblocks is always kept in order by address.
   143    155   */
   144    156   struct FreeBlk {
   145    157     u16 iSize;      /* Number of bytes in this block of free space */
   146    158     u16 iNext;      /* Index in MemPage.aPage[] of the next free block */
   147    159   };
   148    160   
          161  +/*
          162  +** Number of bytes on a single overflow page.
          163  +*/
          164  +#define OVERFLOW_SIZE (SQLITE_PAGE_SIZE-sizeof(Pgno))
          165  +
   149    166   /*
   150    167   ** When the key and data for a single entry in the BTree will not fit in
   151    168   ** the MX_LOACAL_PAYLOAD bytes of space available on the database page,
   152    169   ** then all extra data is written to a linked list of overflow pages.
   153    170   ** Each overflow page is an instance of the following structure.
   154    171   **
   155    172   ** Unused pages in the database are also represented by instances of
................................................................................
   190    207     int idxStart;                  /* Index in aPage[] of real data */
   191    208     PageHdr *pStart;               /* Points to aPage[idxStart] */
   192    209     int nFree;                     /* Number of free bytes in aPage[] */
   193    210     int nCell;                     /* Number of entries on this page */
   194    211     Cell *aCell[MX_CELL];          /* All data entires in sorted order */
   195    212   }
   196    213   
          214  +/*
          215  +** The in-memory image of a disk page has the auxiliary information appended
          216  +** to the end.  EXTRA_SIZE is the number of bytes of space needed to hold
          217  +** that extra information.
          218  +*/
          219  +#define EXTRA_SIZE (sizeof(MemPage)-SQLITE_PAGE_SIZE)
          220  +
   197    221   /*
   198    222   ** Everything we need to know about an open database
   199    223   */
   200    224   struct Btree {
   201    225     Pager *pPager;        /* The page cache */
   202    226     BtCursor *pCursor;    /* A list of all open cursors */
   203    227     MemPage *page1;       /* First page of the database */
................................................................................
   213    237   struct BtCursor {
   214    238     Btree *pBt;                     /* The pointer back to the BTree */
   215    239     BtCursor *pPrev, *pNext;        /* List of all cursors */
   216    240     MemPage *pPage;                 /* Page that contains the entry */
   217    241     int idx;                        /* Index of the entry in pPage->aCell[] */
   218    242     int skip_incr;                  /* */
   219    243   };
          244  +
          245  +/*
          246  +** Compute the total number of bytes that a Cell needs on the main
          247  +** database page.  The number returned includes the Cell header, but
          248  +** not any overflow pages.
          249  +*/
          250  +static int cellSize(Cell *pCell){
          251  +  int n = pCell->h.nKey + pCell->h.nData;
          252  +  if( n>MX_LOCAL_PAYLOAD ){
          253  +    n = MX_LOCAL_PAYLOAD + sizeof(Pgno);
          254  +  }else{
          255  +    n = ROUNDUP(n);
          256  +  }
          257  +  n += sizeof(CellHdr);
          258  +  return n;
          259  +}
   220    260   
   221    261   /*
   222    262   ** Defragment the page given.  All Cells are moved to the
   223    263   ** beginning of the page and all free space is collected 
   224    264   ** into one big FreeBlk at the end of the page.
   225    265   */
   226    266   static void defragmentPage(MemPage *pPage){
................................................................................
   230    270     char newPage[SQLITE_PAGE_SIZE];
   231    271   
   232    272     pc = ROUNDUP(pPage->idxStart + sizeof(PageHdr));
   233    273     pPage->pStart->firstCell = pc;
   234    274     memcpy(newPage, pPage->aPage, pc);
   235    275     for(i=0; i<pPage->nCell; i++){
   236    276       Cell *pCell = &pPage->aCell[i];
   237         -    n = pCell->nKey + pCell->nData;
   238         -    if( n>MAX_LOCAL_PAYLOAD ) n = MAX_LOCAL_PAYLOAD + sizeof(Pgno);
   239         -    n = ROUNDUP(n);
   240         -    n += sizeof(Cell) - sizeof(pCell->aData);
   241         -    pCell->iNext = i<pPage->nCell ? pc + n : 0;
          277  +    n = cellSize(pCell);
          278  +    pCell->h.iNext = i<pPage->nCell ? pc + n : 0;
   242    279       memcpy(&newPage[pc], pCell, n);
   243    280       pPage->aCell[i] = (Cell*)&pPage->aPage[pc];
   244    281       pc += n;
   245    282     }
   246    283     assert( pPage->nFree==SQLITE_PAGE_SIZE-pc );
   247    284     memcpy(pPage->aPage, newPage, pc);
   248    285     pFBlk = &pPage->aPage[pc];
................................................................................
   363    400     pPage->isInit = 1;
   364    401     assert( pPage->pParent==0 );
   365    402     pPage->pParent = pParent;
   366    403     if( pParent ) sqlitepager_ref(pParent);
   367    404     pPage->nCell = 0;
   368    405     idx = pPage->pStart->firstCell;
   369    406     while( idx!=0 ){
   370         -    if( idx>SQLITE_PAGE_SIZE-sizeof(Cell) ) goto page_format_error;
          407  +    if( idx>SQLITE_PAGE_SIZE-MN_CELL_SIZE ) goto page_format_error;
   371    408       if( idx<pPage->idxStart + sizeof(PageHeader) ) goto page_format_error;
   372    409       pCell = (Cell*)&pPage->aPage[idx];
   373    410       pPage->aCell[pPage->nCell++] = pCell;
   374         -    idx = pCell->iNext;
          411  +    idx = pCell->h.iNext;
   375    412     }
   376    413     pPage->nFree = 0;
   377    414     idx = pPage->pStart->firstFree;
   378    415     while( idx!=0 ){
   379    416       if( idx>SQLITE_PAGE_SIZE-sizeof(FreeBlk) ) goto page_format_error;
   380    417       if( idx<pPage->idxStart + sizeof(PageHeader) ) goto page_format_error;
   381    418       pFBlk = (FreeBlk*)&pPage->aPage[idx];
................................................................................
   614    651   
   615    652     pPage = pCur->pPage;
   616    653     assert( pPage!=0 );
   617    654     if( pCur->idx >= pPage->nCell ){
   618    655       *pSize = 0;
   619    656     }else{
   620    657       pCell = pPage->aCell[pCur->idx];
   621         -    *psize = pCell->nKey;
          658  +    *psize = pCell->h.nKey;
   622    659     }
   623    660     return SQLITE_OK;
   624    661   }
   625    662   
   626    663   /*
   627    664   ** Read payload information from the entry that the pCur cursor is
   628    665   ** pointing to.  Begin reading the payload at "offset" and read
................................................................................
   692    729     if( amt==0 ) return SQLITE_OK;
   693    730     pPage = pCur->pPage;
   694    731     assert( pPage!=0 );
   695    732     if( pCur->idx >= pPage->nCell ){
   696    733       return SQLITE_ERROR;
   697    734     }
   698    735     pCell = pPage->aCell[pCur->idx];
   699         -  if( amt+offset > pCell->nKey ){
          736  +  if( amt+offset > pCell->h.nKey ){
   700    737     return getPayload(pCur, offset, amt, zBuf);
   701    738   }
   702    739   
   703    740   /*
   704    741   ** Write the number of bytes of data on the entry that the cursor
   705    742   ** is pointing to into *pSize.  Return SQLITE_OK.  Failure is
   706    743   ** not possible.
................................................................................
   711    748   
   712    749     pPage = pCur->pPage;
   713    750     assert( pPage!=0 );
   714    751     if( pCur->idx >= pPage->nCell ){
   715    752       *pSize = 0;
   716    753     }else{
   717    754       pCell = pPage->aCell[pCur->idx];
   718         -    *pSize = pCell->nData;
          755  +    *pSize = pCell->h.nData;
   719    756     }
   720    757     return SQLITE_OK;
   721    758   }
   722    759   
   723    760   /*
   724    761   ** Read part of the data associated with cursor pCur.  A total
   725    762   ** of "amt" bytes will be transfered into zBuf[].  The transfer
................................................................................
   736    773     if( amt==0 ) return SQLITE_OK;
   737    774     pPage = pCur->pPage;
   738    775     assert( pPage!=0 );
   739    776     if( pCur->idx >= pPage->nCell ){
   740    777       return SQLITE_ERROR;
   741    778     }
   742    779     pCell = pPage->aCell[pCur->idx];
   743         -  if( amt+offset > pCell->nKey ){
   744         -  return getPayload(pCur, offset + pCell->nKey, amt, zBuf);
          780  +  if( amt+offset > pCell->h.nKey ){
          781  +  return getPayload(pCur, offset + pCell->h.nKey, amt, zBuf);
   745    782   }
   746    783   
   747    784   /*
   748    785   ** Compare the key for the entry that pCur points to against the 
   749    786   ** given key (pKey,nKeyOrig).  Put the comparison result in *pResult.
   750    787   ** The result is negative if pCur<pKey, zero if they are equal and
   751    788   ** positive if pCur>pKey.
................................................................................
   760    797     int nKey = nKeyOrig;
   761    798     int n;
   762    799     Cell *pCell;
   763    800   
   764    801     assert( pCur->pPage );
   765    802     assert( pCur->idx>=0 && pCur->idx<pCur->pPage->nCell );
   766    803     pCell = &pCur->pPage->aCell[pCur->idx];
   767         -  if( nKey > pCell->nKey ){
   768         -    nKey = pCell->nKey;
          804  +  if( nKey > pCell->h.nKey ){
          805  +    nKey = pCell->h.nKey;
   769    806     }
   770    807     n = nKey;
   771    808     if( n>MX_LOCAL_PAYLOAD ){
   772    809       n = MX_LOCAL_PAYLOAD;
   773    810     }
   774    811     c = memcmp(pCell->aData, pKey, n);
   775    812     if( c!=0 ){
   776    813       *pResult = c;
   777    814       return SQLITE_OK;
   778    815     }
   779    816     pKey += n;
   780    817     nKey -= n;
   781         -  nextPage = *(Pgno*)&pCell->aData[MX_LOCAL_PAYLOAD];
          818  +  nextPage = pCell->ovfl;
   782    819     while( nKey>0 ){
   783    820       OverflowPage *pOvfl;
   784    821       if( nextPage==0 ){
   785    822         return SQLITE_CORRUPT;
   786    823       }
   787    824       rc = sqlitepager_get(pCur->pBt->pPager, nextPage, &pOvfl);
   788    825       if( rc ){
................................................................................
   798    835       if( c!=0 ){
   799    836         *pResult = c;
   800    837         return SQLITE_OK;
   801    838       }
   802    839       nKey -= n;
   803    840       pKey += n;
   804    841     }
   805         -  c = pCell->nKey - nKeyOrig;
          842  +  c = pCell->h.nKey - nKeyOrig;
   806    843     *pResult = c;
   807    844     return SQLITE_OK;
   808    845   }
   809    846   
   810    847   /*
   811    848   ** Move the cursor down to a new child page.
   812    849   */
................................................................................
   840    877       return SQLITE_INTERNAL;
   841    878     }
   842    879     sqlitepager_ref(pParent);
   843    880     sqlitepager_unref(pCur->pPage);
   844    881     pCur->pPage = pParent;
   845    882     pCur->idx = pPage->nCell;
   846    883     for(i=0; i<pPage->nCell; i++){
   847         -    if( pPage->aCell[i].pgno==oldPgno ){
          884  +    if( pPage->aCell[i].h.pgno==oldPgno ){
   848    885         pCur->idx = i;
   849    886         break;
   850    887       }
   851    888     }
   852    889   }
   853    890   
   854    891   /*
................................................................................
   950    987       if( rc ) return rc;
   951    988       pPage = pCur->pPage;
   952    989     }
   953    990     if( pRes ) *pRes = 0;
   954    991     return SQLITE_OK;
   955    992   }
   956    993   
   957         -int sqliteBtreeInsert(BtCursor*, void *pKey, int nKey, void *pData, int nData);
   958         -int sqliteBtreeDelete(BtCursor*);
          994  +/*
          995  +** Allocate a new page from the database file.
          996  +**
          997  +** The new page is marked as dirty.  (In other words, sqlitepager_write()
          998  +** has already been called on the new page.)  The new page has also
          999  +** been referenced and the calling routine is responsible for calling
         1000  +** sqlitepager_unref() on the new page when it is done.
         1001  +**
         1002  +** SQLITE_OK is returned on success.  Any other return value indicates
         1003  +** an error.  *ppPage and *pPgno are undefined in the event of an error.
         1004  +** Do not invoke sqlitepager_unref() on *ppPage if an error is returned.
         1005  +*/
         1006  +static int allocatePage(Btree *pBt, MemPage **ppPage, Pgno *pPgno){
         1007  +  Page1Header *pPage1 = (Page1Header*)pBt->page1;
         1008  +  if( pPage1->freeList ){
         1009  +    OverflowPage *pOvfl;
         1010  +    rc = sqlitepager_write(pPage1);
         1011  +    if( rc ) return rc;
         1012  +    *pPgno = pPage1->freeList;
         1013  +    rc = sqlitepager_get(pBt->pPager, pPage1->freeList, &pOvfl);
         1014  +    if( rc ) return rc;
         1015  +    rc = sqlitepager_write(pOvfl);
         1016  +    if( rc ){
         1017  +      sqlitepager_unref(pOvfl);
         1018  +      return rc;
         1019  +    }
         1020  +    pPage1->freeList = pOvfl->next;
         1021  +    *ppPage = (MemPage*)pOvfl;
         1022  +  }else{
         1023  +    *pPgno = sqlitepager_pagecount(pBt->pPager);
         1024  +    rc = sqlitepager_get(pBt->pPager, *pPgno, ppPage);
         1025  +    if( rc ) return rc;
         1026  +    rc = sqlitepager_write(*ppPage);
         1027  +  }
         1028  +  return rc;
         1029  +}
         1030  +
         1031  +/*
         1032  +** Add a page of the database file to the freelist.  Either pgno or
         1033  +** pPage but not both may be 0. 
         1034  +*/
         1035  +static int freePage(Btree *pBt, void *pPage, Pgno pgno){
         1036  +  Page1Header *pPage1 = (Page1Header*)pBt->page1;
         1037  +  OverflowPage *pOvfl = (OverflowPage*)pPage;
         1038  +  int rc;
         1039  +  int needOvflUnref = 0;
         1040  +  if( pgno==0 ){
         1041  +    assert( pOvfl!=0 );
         1042  +    pgno = sqlitepager_pagenumber(pOvfl);
         1043  +  }
         1044  +  rc = sqlitepager_write(pPage1);
         1045  +  if( rc ){
         1046  +    return rc;
         1047  +  }
         1048  +  if( pOvfl==0 ){
         1049  +    assert( pgno>0 );
         1050  +    rc = sqlitepager_get(pBt->pPager, pgno, &pOvfl);
         1051  +    if( rc ) return rc;
         1052  +    needOvflUnref = 1;
         1053  +  }
         1054  +  rc = sqlitepager_write(pOvfl);
         1055  +  if( rc ){
         1056  +    if( needOvflUnref ) sqlitepager_unref(pOvfl);
         1057  +    return rc;
         1058  +  }
         1059  +  pOvfl->next = pPage1->freeList;
         1060  +  pPage1->freeList = pgno;
         1061  +  memset(pOvfl->aData, 0, OVERFLOW_SIZE);
         1062  +  rc = sqlitepager_unref(pOvfl);
         1063  +  return rc;
         1064  +}
         1065  +
         1066  +/*
         1067  +** Erase all the data out of a cell.  This involves returning overflow
         1068  +** pages back the freelist.
         1069  +*/
         1070  +static int clearCell(Btree *pBt, Cell *pCell){
         1071  +  Pager *pPager = pBt->pPager;
         1072  +  OverflowPage *pOvfl;
         1073  +  Page1Header *pPage1 = (Page1Header*)pBt->page1;
         1074  +  Pgno ovfl, nextOvfl;
         1075  +  int rc;
         1076  +
         1077  +  ovfl = pCell->ovfl;
         1078  +  pCell->ovfl = 0;
         1079  +  while( ovfl ){
         1080  +    rc = sqlitepager_get(pPager, ovfl, &pOvfl);
         1081  +    if( rc ) return rc;
         1082  +    nextOvfl = pOvfl->next;
         1083  +    freePage(pBt, pOvfl, ovfl);
         1084  +    ovfl = nextOvfl;
         1085  +    sqlitepager_unref(pOvfl);
         1086  +  }
         1087  +}
         1088  +
         1089  +/*
         1090  +** Create a new cell from key and data.  Overflow pages are allocated as
         1091  +** necessary and linked to this cell.  
         1092  +*/
         1093  +static int fillInCell(
         1094  +  Btree *pBt,              /* The whole Btree.  Needed to allocate pages */
         1095  +  Cell *pCell,             /* Populate this Cell structure */
         1096  +  void *pKey, int nKey,    /* The key */
         1097  +  void *pData,int nData    /* The data */
         1098  +){
         1099  +  int OverflowPage *pOvfl;
         1100  +  Pgno *pNext;
         1101  +  int spaceLeft;
         1102  +  int n;
         1103  +  int nPayload;
         1104  +  char *pPayload;
         1105  +  char *pSpace;
         1106  +
         1107  +  pCell->h.pgno = 0;
         1108  +  pCell->h.nKey = nKey;
         1109  +  pCell->h.nData = nData;
         1110  +  pCell->h.iNext = 0;
         1111  +
         1112  +  pNext = &pCell->ovfl;
         1113  +  pSpace = pCell->aData;
         1114  +  spaceLeft = MX_LOCAL_PAYLOAD;
         1115  +  pPayload = pKey;
         1116  +  pKey = 0;
         1117  +  nPayload = nKey;
         1118  +  while( nPayload>0 ){
         1119  +    if( spaceLeft==0 ){
         1120  +      rc = allocatePage(pBt, &pOvfl, pNext);
         1121  +      if( rc ){
         1122  +        *pNext = 0;
         1123  +        clearCell(pCell);
         1124  +        return rc;
         1125  +      }
         1126  +      spaceLeft = OVERFLOW_SIZE;
         1127  +      pSpace = pOvfl->aData;
         1128  +      pNextPg = &pOvfl->next;
         1129  +    }
         1130  +    n = nPayload;
         1131  +    if( n>spaceLeft ) n = spaceLeft;
         1132  +    memcpy(pSpace, pPayload, n);
         1133  +    nPayload -= n;
         1134  +    if( nPayload==0 && pData ){
         1135  +      pPayload = pData;
         1136  +      nPayload = nData;
         1137  +      pData = 0;
         1138  +    }else{
         1139  +      pPayload += n;
         1140  +    }
         1141  +    spaceLeft -= n;
         1142  +    pSpace += n;
         1143  +  }
         1144  +  return SQLITE_OK;
         1145  +}
         1146  +
         1147  +/*
         1148  +** Attempt to move N or more bytes out of the page that the cursor
         1149  +** points to into the left sibling page.  (The left sibling page
         1150  +** contains cells that are less than the cells on this page.)  Return
         1151  +** TRUE if successful and FALSE if not.
         1152  +**
         1153  +** Reasons for not being successful include: 
         1154  +**
         1155  +**    (1) there is no left sibling,
         1156  +**    (2) we could only move N-1 bytes or less,
         1157  +**    (3) some kind of file I/O error occurred
         1158  +*/
         1159  +static int rotateLeft(BtCursor *pCur, int N){
         1160  +}
         1161  +
         1162  +/*
         1163  +** Split a single database page into two roughly equal-sized pages.
         1164  +**
         1165  +** The input is an existing page and a new Cell.  The Cell might contain
         1166  +** a valid Cell.pgno field pointing to a child page.
         1167  +**
         1168  +** The output is the Cell that divides the two new pages.  The content
         1169  +** of this divider Cell is written into *pCenter.  pCenter->pgno points
         1170  +** to the new page that was created to hold the smaller half of the
         1171  +** cells from the divided page.  The larger cells from the divided
         1172  +** page are written to a newly allocated page and *ppOut is made to
         1173  +** point to that page.  Except, if ppOut==NULL then the larger cells
         1174  +** remain on pIn.
         1175  +*/
         1176  +static int split(
         1177  +  MemPage *pIn,       /* The page that is to be divided */
         1178  +  Cell *pNewCell,     /* A new cell to add to pIn before dividing it up */
         1179  +  Cell *pCenter,      /* Write the cell that divides the two pages here */
         1180  +  MemPage **ppOut     /* If not NULL, put larger cells in new page at *ppOut */
         1181  +){
         1182  +  
         1183  +}
         1184  +
         1185  +/*
         1186  +** With this routine, we know that the Cell pNewCell will fit into the
         1187  +** database page that pCur points to.  The calling routine has made
         1188  +** sure it will fit.  All this routine needs to do is add the Cell
         1189  +** to the page.
         1190  +*/
         1191  +static int insertCell(BtCursor *pCur, Cell *pNewCell){
         1192  +}
         1193  +
         1194  +/*
         1195  +** Insert pNewCell into the database page that pCur is pointing to.
         1196  +** pNewCell->h.pgno points to a child page that comes before pNewCell->data[],
         1197  +** unless pCur is a leaf page.
         1198  +*/
         1199  +static int addToPage(BtCursor *pCur, Cell *pNewCell){
         1200  +  Cell tempCell;
         1201  +  Cell centerCell;
         1202  +
         1203  +  for(;;){
         1204  +    MemPage *pPage = pCur->pPage;
         1205  +    int sz = cellSize(pNewCell);
         1206  +    if( sz<=pPage->nFree ){
         1207  +      insertCell(pCur, pNewCell);
         1208  +      return SQLITE_OK;
         1209  +    }
         1210  +    if( pPage->pParent==0 ){
         1211  +      MemPage *pRight;
         1212  +      PageHdr *pHdr;
         1213  +      FreeBlk *pFBlk;
         1214  +      int pc;
         1215  +      rc = split(pPage, pNewCell, &centerCell, &pRight);
         1216  +      pHdr = pPage->pStart;
         1217  +      pHdr->pgno = sqlitepager_pagenumber(pRight);
         1218  +      sqlitepager_unref(pRight);
         1219  +      pHdr->firstCell = pc = pPage->idxStart + sizeof(*pHdr);
         1220  +      sz = cellSize(&centerCell);
         1221  +      memcpy(&pPage->aPage[pc], &centerCell, sz);
         1222  +      pc += sz;
         1223  +      pHdr->firstFree = pc;
         1224  +      pFBlk = (FreeBlk*)&pPage->aPage[pc];
         1225  +      pFBlk->iSize = SQLITE_PAGE_SIZE - pc;
         1226  +      pFBlk->iNext = 0;
         1227  +      memset(&pFBlk[1], 0, pFBlk->iSize-sizeof(*pFBlk));
         1228  +      return SQLITE_OK;
         1229  +    }
         1230  +    if( rotateLeft(pCur, sz - pPage->nFree) 
         1231  +           || rotateRight(pCur, sz - pPage->nFree) ){
         1232  +      insertCell(pCur, pNewCell);
         1233  +      return SQLITE_OK;
         1234  +    }
         1235  +    rc = split(pPage, pNewCell, &centerCell, 0);
         1236  +    parentPage(pCur);
         1237  +    tempCell = centerCell;
         1238  +    pNewPage = &tempCell;
         1239  +  }
         1240  +}
         1241  +
         1242  +/*
         1243  +** Insert a new record into the BTree.  The key is given by (pKey,nKey)
         1244  +** and the data is given by (pData,nData).  The cursor is used only to
         1245  +** define what database the record should be inserted into.  The cursor
         1246  +** is NOT left pointing at the new record.
         1247  +*/
         1248  +int sqliteBtreeInsert(
         1249  +  BtCursor *pCur,            /* Insert data into the table of this cursor */
         1250  +  void *pKey,  int nKey,     /* The key of the new record */
         1251  +  void *pData, int nData     /* The data of the new record */
         1252  +){
         1253  +  Cell newCell;
         1254  +  int rc;
         1255  +  int loc;
         1256  +  MemPage *pPage;
         1257  +  Btree *pBt = pCur->pBt;
         1258  +
         1259  +  rc = sqliteBtreeMoveTo(pCur, pKey, nKey, &loc);
         1260  +  if( rc ) return rc;
         1261  +  rc = fillInCell(pBt, &newCell, pKey, nKey, pData, nData);
         1262  +  if( rc ) return rc;
         1263  +  newCell.h.pgno = pCur->pPage->aCell[pCur->idx].h.pgno;
         1264  +  if( loc==0 ){
         1265  +    rc = clearCell(pBt, &pCur->pPage->aCell[pCur->idx]);
         1266  +    if( rc ){
         1267  +      return SQLITE_CORRUPT;
         1268  +    }
         1269  +    unlinkCell(pCur);
         1270  +  }
         1271  +  return addToPage(pCur, &newCell);
         1272  +}
         1273  +
         1274  +/*
         1275  +** Delete the record that the cursor is pointing to.  Leave the cursor
         1276  +** pointing at the next record after the one to which it currently points.
         1277  +** Also, set the pCur->skip_next flag so that the next sqliteBtreeNext() 
         1278  +** called for this cursor will be a no-op.
         1279  +*/
         1280  +int sqliteBtreeDelete(BtCursor *pCur){
         1281  +}