SQLite4
Check-in [ad91883237326dc60ebbf0fc4a897defdfd9d154]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
SHA1 Hash:ad91883237326dc60ebbf0fc4a897defdfd9d154
Date: 2014-01-06 20:30:54
User: dan
Comment:Add merging code.
Tags And Properties
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/btInt.h

101
102
103
104
105
106
107
108
109

110
111
112
113
114
115
116
...
205
206
207
208
209
210
211





212
213
214
215
216
217
218
  u32 iMinLevel;                  /* Minimum level of input segments */
  u32 iMaxLevel;                  /* Maximum level of input segments */
  u32 iOutLevel;                  /* Level at which to write output */
  u32 aBlock[32];                 /* Allocated blocks */

  u32 iNextPg;                    /* Page that contains next input key */
  u32 iNextCell;                  /* Cell that contains next input key */
  u32 nUsed;                      /* Number of output blocks actually written */
  u32 iFreeList;                  /* First page of new free-list (if any) */

};

int sqlite4BtMerge(bt_db *db, BtDbHdr *pHdr, u8 *aSched);

/*************************************************************************
** Interface to bt_pager.c functionality.
*/
................................................................................
void sqlite4BtPagerLogsize(BtPager*, int*);
void sqlite4BtPagerMultiproc(BtPager *pPager, int *piVal);
void sqlite4BtPagerLogsizeCb(BtPager *pPager, bt_logsizecb*);
int sqlite4BtPagerCheckpoint(BtPager *pPager, bt_checkpoint*);

int sqlite4BtPagerHdrdump(BtPager *pPager, sqlite4_buffer *pBuf);






/*
** End of bt_pager.c interface.
*************************************************************************/

/*************************************************************************
** File-system interface.
*/







<

>







 







>
>
>
>
>







101
102
103
104
105
106
107

108
109
110
111
112
113
114
115
116
...
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
  u32 iMinLevel;                  /* Minimum level of input segments */
  u32 iMaxLevel;                  /* Maximum level of input segments */
  u32 iOutLevel;                  /* Level at which to write output */
  u32 aBlock[32];                 /* Allocated blocks */

  u32 iNextPg;                    /* Page that contains next input key */
  u32 iNextCell;                  /* Cell that contains next input key */

  u32 iFreeList;                  /* First page of new free-list (if any) */
  u32 aRoot[32];                  /* Root pages for populated blocks */
};

int sqlite4BtMerge(bt_db *db, BtDbHdr *pHdr, u8 *aSched);

/*************************************************************************
** Interface to bt_pager.c functionality.
*/
................................................................................
void sqlite4BtPagerLogsize(BtPager*, int*);
void sqlite4BtPagerMultiproc(BtPager *pPager, int *piVal);
void sqlite4BtPagerLogsizeCb(BtPager *pPager, bt_logsizecb*);
int sqlite4BtPagerCheckpoint(BtPager *pPager, bt_checkpoint*);

int sqlite4BtPagerHdrdump(BtPager *pPager, sqlite4_buffer *pBuf);

/*
** Write a page buffer directly to the database file.
*/
int sqlite4BtPagerRawWrite(BtPager *pPager, u32 pgno, u8 *aBuf);

/*
** End of bt_pager.c interface.
*************************************************************************/

/*************************************************************************
** File-system interface.
*/

Changes to src/bt_main.c

23
24
25
26
27
28
29





30
31
32
33
34
35
36
...
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557


558



559

560
561
562
563
564
565
566
....
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
....
1978
1979
1980
1981
1982
1983
1984



















1985
1986
1987
1988
1989
1990
1991
....
3777
3778
3779
3780
3781
3782
3783
3784
3785
























































































































































































































































































3786
3787
3788
3789
3790
3791
3792
3793
3794
3795


3796

3797
3798
3799
3800
3801











3802

3803
3804
3805
3806
3807
3808

3809
3810
3811
3812


3813
3814
3815
3816
3817
3818
3819
** Values that make up the single byte flags field at the start of
** b-tree pages. 
*/
#define BT_PGFLAGS_INTERNAL 0x01  /* True for non-leaf nodes */
#define BT_PGFLAGS_METATREE 0x02  /* True for a meta-tree page */
#define BT_PGFLAGS_SCHEDULE 0x04  /* True for a schedule-tree page */






/* #define BT_STDERR_DEBUG 1 */

typedef struct BtCursor BtCursor;
typedef struct FiCursor FiCursor;
typedef struct FiSubCursor FiSubCursor;

struct bt_db {
................................................................................
    memcpy(&s, aData, sizeof(s));

    sqlite4BtBufAppendf(pBuf, "  bBusy=%d\n", (int)s.bBusy);
    sqlite4BtBufAppendf(pBuf, "  iAge=%d\n", (int)s.iAge);
    sqlite4BtBufAppendf(pBuf, "  iMinLevel=%d\n", (int)s.iMinLevel);
    sqlite4BtBufAppendf(pBuf, "  iMaxLevel=%d\n", (int)s.iMaxLevel);
    sqlite4BtBufAppendf(pBuf, "  iOutLevel=%d\n", (int)s.iOutLevel);
    sqlite4BtBufAppendf(pBuf, "  blocks=(");
    for(i=0; s.aBlock[i] && i<array_size(s.aBlock); i++){
      sqlite4BtBufAppendf(pBuf, "%s%d", i==0 ? "" : " ", (int)s.aBlock[i]);
    }
    sqlite4BtBufAppendf(pBuf, ")\n");

    sqlite4BtBufAppendf(pBuf, "  iNextPg=%d\n", (int)s.iNextPg);
    sqlite4BtBufAppendf(pBuf, "  iNextCell=%d\n", (int)s.iNextCell);


    sqlite4BtBufAppendf(pBuf, "  nUsed=%d\n", (int)s.nUsed);



    sqlite4BtBufAppendf(pBuf, "  iFreeList=%d\n", (int)s.iFreeList);

  }else{
    sqlite4BtBufAppendf(pBuf, "nCell=%d ", nCell);
    sqlite4BtBufAppendf(pBuf, "iFree=%d ", (int)btFreeOffset(aData, nData));
    sqlite4BtBufAppendf(pBuf, "flags=%d ", (int)btFlags(aData));
    if( btFlags(aData) & BT_PGFLAGS_INTERNAL ){
      sqlite4BtBufAppendf(pBuf, "rchild=%d ", (int)btGetU32(&aData[1]));
    }
................................................................................
      ){
        rc = SQLITE4_NOTFOUND;
      }else{
        rc = btCsrData(&pSub->mcsr, 0, 4, &pV, &nV);
      }
    }
    if( rc==SQLITE4_OK ){
      BtDbHdr *pHdr = sqlite4BtPagerDbhdr(pCsr->base.pDb->pPager);
      pSub->csr.iRoot = sqlite4BtGetU32((const u8*)pV);
      rc = btCsrEnd(&pSub->csr, !bNext);
    }
  }

  return rc;
}
................................................................................
static u8 *btCellFindSize(u8 *aData, int nData, int iCell, int *pnByte){
  u8 *pCell;

  pCell = btCellFind(aData, nData, iCell);
  *pnByte = btCellSize(pCell, 0==(btFlags(aData) & BT_PGFLAGS_INTERNAL));
  return pCell;
}




















/*
** Allocate a new page buffer.
*/
static int btNewBuffer(bt_db *pDb, u8 **paBuf){
  const int pgsz = sqlite4BtPagerPagesize(pDb->pPager);
  u8 *aBuf;
................................................................................

  if( rc==SQLITE4_OK ){
    rc = fiCsrSetCurrent(pCsr);
  }

  return rc;
}

/*
























































































































































































































































































** This is called by a checkpointer to handle a schedule page.
*/
int sqlite4BtMerge(bt_db *db, BtDbHdr *pHdr, u8 *aSched){
  BtSchedule s;                   /* Deserialized schedule object */
  int rc = SQLITE4_OK;            /* Return code */

  /* Set up the input cursor. */
  btReadSchedule(db, aSched, &s);
  if( s.bBusy ){
    FiCursor fcsr;                /* FiCursor used to read input */


    rc = fiSetupMergeCsr(db, pHdr, &s, &fcsr);


    /* The following loop runs once for each key copied from the input to
    ** the output segments. It terminates either when the input is exhausted
    ** or when all available output blocks are full.  */
    while( rc==SQLITE4_OK ){











      rc = fiCsrStep(&fcsr);

    }

    /* Assuming no error has occurred, update the serialized BtSchedule
    ** structure stored in buffer aSched[]. The caller will write this
    ** buffer to the database file as page (pHdr->iSRoot).  */
    if( rc==SQLITE4_OK || rc==SQLITE4_NOTFOUND ){

      rc = SQLITE4_OK;
      btWriteSchedule(aSched, &s, &rc);
    }



    fiCsrReset(&fcsr);
  }
  return rc;
}

/*
** Insert a new key/value pair or replace an existing one.







>
>
>
>
>







 







|







>
>
|
>
>
>
|
>







 







<







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 









>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>










>
>

>





>
>
>
>
>
>
>
>
>
>
>
|
>





|
>
|
|
|
|
>
>







23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
...
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
....
1355
1356
1357
1358
1359
1360
1361

1362
1363
1364
1365
1366
1367
1368
....
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
....
3806
3807
3808
3809
3810
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911
3912
3913
3914
3915
3916
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927
3928
3929
3930
3931
3932
3933
3934
3935
3936
3937
3938
3939
3940
3941
3942
3943
3944
3945
3946
3947
3948
3949
3950
3951
3952
3953
3954
3955
3956
3957
3958
3959
3960
3961
3962
3963
3964
3965
3966
3967
3968
3969
3970
3971
3972
3973
3974
3975
3976
3977
3978
3979
3980
3981
3982
3983
3984
3985
3986
3987
3988
3989
3990
3991
3992
3993
3994
3995
3996
3997
3998
3999
4000
4001
4002
4003
4004
4005
4006
4007
4008
4009
4010
4011
4012
4013
4014
4015
4016
4017
4018
4019
4020
4021
4022
4023
4024
4025
4026
4027
4028
4029
4030
4031
4032
4033
4034
4035
4036
4037
4038
4039
4040
4041
4042
4043
4044
4045
4046
4047
4048
4049
4050
4051
4052
4053
4054
4055
4056
4057
4058
4059
4060
4061
4062
4063
4064
4065
4066
4067
4068
4069
4070
4071
4072
4073
4074
4075
4076
4077
4078
4079
4080
4081
4082
4083
4084
4085
4086
4087
4088
4089
4090
4091
4092
4093
4094
4095
4096
4097
4098
4099
4100
4101
4102
4103
4104
4105
4106
4107
4108
4109
4110
4111
4112
4113
4114
4115
4116
4117
4118
4119
4120
4121
4122
4123
4124
4125
4126
4127
4128
4129
4130
4131
4132
4133
4134
4135
4136
4137
4138
4139
4140
4141
4142
4143
4144
4145
4146
** Values that make up the single byte flags field at the start of
** b-tree pages. 
*/
#define BT_PGFLAGS_INTERNAL 0x01  /* True for non-leaf nodes */
#define BT_PGFLAGS_METATREE 0x02  /* True for a meta-tree page */
#define BT_PGFLAGS_SCHEDULE 0x04  /* True for a schedule-tree page */

/*
** Maximum depth of fast-insert sub-trees.
*/
#define MAX_SUBTREE_DEPTH 8

/* #define BT_STDERR_DEBUG 1 */

typedef struct BtCursor BtCursor;
typedef struct FiCursor FiCursor;
typedef struct FiSubCursor FiSubCursor;

struct bt_db {
................................................................................
    memcpy(&s, aData, sizeof(s));

    sqlite4BtBufAppendf(pBuf, "  bBusy=%d\n", (int)s.bBusy);
    sqlite4BtBufAppendf(pBuf, "  iAge=%d\n", (int)s.iAge);
    sqlite4BtBufAppendf(pBuf, "  iMinLevel=%d\n", (int)s.iMinLevel);
    sqlite4BtBufAppendf(pBuf, "  iMaxLevel=%d\n", (int)s.iMaxLevel);
    sqlite4BtBufAppendf(pBuf, "  iOutLevel=%d\n", (int)s.iOutLevel);
    sqlite4BtBufAppendf(pBuf, "  aBlock=(");
    for(i=0; s.aBlock[i] && i<array_size(s.aBlock); i++){
      sqlite4BtBufAppendf(pBuf, "%s%d", i==0 ? "" : " ", (int)s.aBlock[i]);
    }
    sqlite4BtBufAppendf(pBuf, ")\n");

    sqlite4BtBufAppendf(pBuf, "  iNextPg=%d\n", (int)s.iNextPg);
    sqlite4BtBufAppendf(pBuf, "  iNextCell=%d\n", (int)s.iNextCell);
    sqlite4BtBufAppendf(pBuf, "  iFreeList=%d\n", (int)s.iFreeList);

    sqlite4BtBufAppendf(pBuf, "  aRoot=(");
    for(i=0; s.aBlock[i] && i<array_size(s.aBlock); i++){
      sqlite4BtBufAppendf(pBuf, "%s%d", i==0 ? "" : " ", (int)s.aRoot[i]);
    }
    sqlite4BtBufAppendf(pBuf, ")\n");

  }else{
    sqlite4BtBufAppendf(pBuf, "nCell=%d ", nCell);
    sqlite4BtBufAppendf(pBuf, "iFree=%d ", (int)btFreeOffset(aData, nData));
    sqlite4BtBufAppendf(pBuf, "flags=%d ", (int)btFlags(aData));
    if( btFlags(aData) & BT_PGFLAGS_INTERNAL ){
      sqlite4BtBufAppendf(pBuf, "rchild=%d ", (int)btGetU32(&aData[1]));
    }
................................................................................
      ){
        rc = SQLITE4_NOTFOUND;
      }else{
        rc = btCsrData(&pSub->mcsr, 0, 4, &pV, &nV);
      }
    }
    if( rc==SQLITE4_OK ){

      pSub->csr.iRoot = sqlite4BtGetU32((const u8*)pV);
      rc = btCsrEnd(&pSub->csr, !bNext);
    }
  }

  return rc;
}
................................................................................
static u8 *btCellFindSize(u8 *aData, int nData, int iCell, int *pnByte){
  u8 *pCell;

  pCell = btCellFind(aData, nData, iCell);
  *pnByte = btCellSize(pCell, 0==(btFlags(aData) & BT_PGFLAGS_INTERNAL));
  return pCell;
}

/*
** Return a pointer to and the size of the cell that cursor pCsr currently
** points to.
*/
static void fiCsrCell(FiCursor *pCsr, const void **ppCell, int *pnCell){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->base.pDb->pPager);
  FiSubCursor *pSub;              /* Current sub-cursor */
  u8 *aData;                      /* Current page data */
  int iCell;

  assert( pCsr->iBt>=0 );
  pSub = &pCsr->aSub[pCsr->iBt];
  aData = sqlite4BtPageData(pSub->csr.apPage[pSub->csr.nPg-1]);
  iCell = pSub->csr.aiCell[pSub->csr.nPg-1];

  *ppCell = btCellFindSize(aData, pgsz, iCell, pnCell);
}


/*
** Allocate a new page buffer.
*/
static int btNewBuffer(bt_db *pDb, u8 **paBuf){
  const int pgsz = sqlite4BtPagerPagesize(pDb->pPager);
  u8 *aBuf;
................................................................................

  if( rc==SQLITE4_OK ){
    rc = fiCsrSetCurrent(pCsr);
  }

  return rc;
}

/*
** An object of type FiWriter is used to write to a fast-insert sub-tree.
*/
typedef struct FiWriter FiWriter;
typedef struct FiWriterPg FiWriterPg;
struct FiWriterPg {
  int bAllocated;                 /* True if between Alloc() and Flush() */
  int nCell;                      /* Number of cells on page */
  int iFree;                      /* Free space offset */
  u8 *aBuf;                       /* Buffer to assemble content in */
};
struct FiWriter {
  bt_db *db;                      /* Database handle */
  BtSchedule *pSched;             /* Schedule object being implemented */
  int pgsz;                       /* Page size in bytes */
  int nPgPerBlk;                  /* Pages per block in this database */
  u32 iBlk;                       /* Block to write to */

  int nAlloc;                     /* Pages allocated from current block */
  int nWrite;                     /* Pages written to current block */
  int nHier;                      /* Valid entries in apHier[] array */
  FiWriterPg aHier[MAX_SUBTREE_DEPTH];      /* Path from root to current leaf */
};

/*
** Write the page out to disk.
*/
static int fiWriterFlush(
  FiWriter *p, 
  FiWriterPg *pPg, 
  u32 *pPgno
){
  int rc;                         /* Return code */
  u32 pgno;                       /* New page number */

  assert( pPg->bAllocated==1 );
  assert( p->iBlk>1 );

  pPg->bAllocated = 0;
  pgno = (p->nPgPerBlk * (p->iBlk-1) + 1) + p->nWrite;
  p->nWrite++;
  assert( p->nWrite<=p->nAlloc );
  assert( p->nWrite<=p->nPgPerBlk );
  rc = sqlite4BtPagerRawWrite(p->db->pPager, pgno, pPg->aBuf);
  *pPgno = pgno;
  return rc;
}

static int fiWriterFlushAll(FiWriter *p){
  int i;
  int rc = SQLITE4_OK;
  u32 pgno = 0;

  for(i=0; rc==SQLITE4_OK && i<p->nHier; i++){
    FiWriterPg *pPg = &p->aHier[i];
    if( i!=0 ){
      btPutU32(&pPg->aBuf[1], pgno);
    }
    rc = fiWriterFlush(p, pPg, &pgno);

    btFreeBuffer(p->db, pPg->aBuf);
    pPg->aBuf = 0;
  }

  for(i=0; p->pSched->aBlock[i]!=p->iBlk; i++);
  assert( p->pSched->aRoot[i]==0 );
  p->pSched->aRoot[i] = pgno;

  return rc;
}

static void fiWriterCleanup(FiWriter *p){
}

/*
** Allocate a page buffer to use as part of writing a sub-tree. 
**
** No page number is assigned at this point.
*/
static int fiWriterAlloc(FiWriter *p, int bLeaf, FiWriterPg *pPg){
  int rc = SQLITE4_OK;

  assert( pPg->bAllocated==0 );
  pPg->bAllocated = 1;
  p->nAlloc++;
  pPg->nCell = 0;

  if( pPg->aBuf==0 ){
    rc = btNewBuffer(p->db, &pPg->aBuf);
  }
  if( rc==SQLITE4_OK ){
    if( bLeaf ){
      pPg->iFree = 1;
      pPg->aBuf[0] = 0x00;
    }else{
      pPg->iFree = 5;
      pPg->aBuf[0] = BT_PGFLAGS_INTERNAL;
    }
  }
  return rc;
}

static int fiWriterPush(FiWriter *p, const u8 *pKey, int nKey){
  int rc = SQLITE4_OK;
  int i;                          /* Iterator variable */
  int iIns;                       /* Index in aHier[] to insert new key at */
  int nByte;                      /* Bytes required by new cell */
  int iOff;                       /* Byte offset to write to */
  int nMaxAlloc;                  /* Maximum pages that may be allocated */
  u32 pgno;
  FiWriterPg *pPg;

  nByte = 2 + sqlite4BtVarintLen32(nKey) + nKey + 4;

  nMaxAlloc = MIN((p->nPgPerBlk - p->nAlloc), MAX_SUBTREE_DEPTH);
  for(iIns=1; iIns<p->nHier; iIns++){
    /* Check if the key will fit on page FiWriter.aHier[iIns]. If so,
    ** break out of the loop. */
    int nFree;
    pPg = &p->aHier[iIns];
    nFree = p->pgsz - (pPg->iFree + 6 + pPg->nCell*2);
    if( nFree>=nByte ) break;
  }
  if( iIns==p->nHier ){
    p->nHier = iIns+1;
    rc = fiWriterAlloc(p, 0, &p->aHier[iIns]);
  }

  if( rc==SQLITE4_OK && iIns==nMaxAlloc ){
    rc = BT_BLOCKFULL;
  }

  for(i=0; rc==SQLITE4_OK && i<iIns; i++){
    pPg = &p->aHier[i];
    if( i!=0 ){
      btPutU32(&pPg->aBuf[1], pgno);
    }
    rc = fiWriterFlush(p, pPg, &pgno);
    if( rc==SQLITE4_OK ){
      rc = fiWriterAlloc(p, (i==0), pPg);
    }
  }

  /* Write the b+tree cell containing (pKey/nKey) into page p->aHier[iIns]. */
  if( rc==SQLITE4_OK ){
    pPg = &p->aHier[iIns];
    iOff = pPg->iFree;
    btPutU16(btCellPtrFind(pPg->aBuf, p->pgsz, pPg->nCell), (u16)iOff);
    pPg->nCell++;
    btPutU16(&pPg->aBuf[p->pgsz-2], pPg->nCell);
    iOff += sqlite4BtVarintPut32(&pPg->aBuf[iOff], nKey);
    memcpy(&pPg->aBuf[iOff], pKey, nKey);
    iOff += nKey;
    btPutU32(&pPg->aBuf[iOff], pgno);
    iOff += 4;
    pPg->iFree = iOff;
  }

  return rc;
}


/*
** Initialize a writer object that will be used to implement the schedule
** passed as the second argument.
*/
static void fiWriterInit(
  bt_db *db, 
  BtSchedule *pSched, 
  FiWriter *p,
  int *pRc
){
  if( *pRc==SQLITE4_OK ){
    BtDbHdr *pHdr = sqlite4BtPagerDbhdr(db->pPager);
    int i;

    memset(p, 0, sizeof(FiWriter));
    p->db = db;
    p->pSched = pSched;
    p->pgsz = pHdr->pgsz;
    p->nPgPerBlk = (pHdr->blksz / pHdr->pgsz);

    /* Find a block to write to */
    for(i=0; pSched->aBlock[i]; i++){
      if( pSched->aRoot[i]==0 ) break;
    }
    if( pSched->aBlock[i]==0 ){
      *pRc = BT_BLOCKFULL;
    }else{
      p->iBlk = pSched->aBlock[i];
    }
  }
}


/*
** Argument aBuf points to a buffer containing a leaf cell. This function
** returns a pointer to the key prefix embedded within the cell. Before
** returning, *pnKey is set to the size of the key prefix in bytes.
*/
static const u8 *btKeyPrefixFromCell(const u8 *aBuf, int *pnKey){
  u8 *p = (u8*)aBuf;
  int nKey;

  p += sqlite4BtVarintGet32(p, &nKey);
  if( nKey==0 ){
    p += sqlite4BtVarintGet32(p, &nKey);
  }

  *pnKey = nKey;
  return p;
}

/*
** Return the size in bytes of the shortest prefix of (pNew/nNew) that
** is greater than (pOld/nOld). Or, if that prefix would be too large
** to store on an internal b+tree node, return 
*/
static int btPrefixLength(
  int pgsz, 
  const u8 *pOld, int nOld, 
  const u8 *pNew, int nNew
){
  int nPrefix;
  int nCmp = MIN(nOld, nNew);
  for(nPrefix=0; nPrefix<nCmp && pOld[nPrefix]==pNew[nPrefix]; nPrefix++);
  if( nPrefix>=(pgsz/4) ) return 0;
  assert( nPrefix<nNew );
  return nPrefix+1;
}

static int fiWriterAdd(FiWriter *p, const void *pCell, int nCell){
  int rc = SQLITE4_OK;
  FiWriterPg *pPg;
  int nReq;                       /* Bytes of space required on leaf page */
  int nFree;                      /* Bytes of space available on leaf page */

  if( p->nHier==0 ){
    assert( p->aHier[0].nCell==0 );
    assert( p->aHier[0].iFree==0 );
    p->nHier++;
    rc = fiWriterAlloc(p, 1, &p->aHier[0]);
    assert( rc!=BT_BLOCKFULL );
  }
  pPg = &p->aHier[0];

  /* Calculate the space required for the cell. And space available on
  ** the current leaf page.  */
  nReq = nCell + 2;
  nFree = p->pgsz - pPg->iFree - (6 + 2*(pPg->nCell+1));

  if( nReq>nFree ){
    /* The current leaf page is finished. Cell pCell/nCell will become
    ** the first cell on the next leaf page.  */
    const u8 *pOld; int nOld;     /* Prefix of last key on current leaf */
    const u8 *pNew; int nNew;     /* Prefix of new key */
    int nPrefix;

    pOld = btCellFind(pPg->aBuf, p->pgsz, btCellCount(pPg->aBuf, p->pgsz)-1);
    pOld = btKeyPrefixFromCell(pOld, &nOld);
    pNew = btKeyPrefixFromCell(pCell, &nNew);

    /* Push the shortest prefix of key (pNew/nNew) that is greater than key
    ** (pOld/nOld) up into the b+tree hierarchy.  */
    nPrefix = btPrefixLength(p->pgsz, pOld, nOld, pNew, nNew);
    rc = fiWriterPush(p, pNew, nPrefix);
  }

  /* Write the leaf cell into the page at FiWriter.aHier[0] */
  if( rc==SQLITE4_OK ){
    memcpy(&pPg->aBuf[pPg->iFree], pCell, nCell);
    btPutU16(btCellPtrFind(pPg->aBuf, p->pgsz, pPg->nCell), pPg->iFree);
    pPg->nCell++;
    btPutU16(&pPg->aBuf[p->pgsz-2], pPg->nCell);
    pPg->iFree += nCell;
  }

  return rc;
}

/*
** This is called by a checkpointer to handle a schedule page.
*/
int sqlite4BtMerge(bt_db *db, BtDbHdr *pHdr, u8 *aSched){
  BtSchedule s;                   /* Deserialized schedule object */
  int rc = SQLITE4_OK;            /* Return code */

  /* Set up the input cursor. */
  btReadSchedule(db, aSched, &s);
  if( s.bBusy ){
    FiCursor fcsr;                /* FiCursor used to read input */
    FiWriter writer;              /* FiWriter used to write output */

    rc = fiSetupMergeCsr(db, pHdr, &s, &fcsr);
    fiWriterInit(db, &s, &writer, &rc);

    /* The following loop runs once for each key copied from the input to
    ** the output segments. It terminates either when the input is exhausted
    ** or when all available output blocks are full.  */
    while( rc==SQLITE4_OK ){
      const void *pCell;          /* Cell to copy to output */
      int nCell;                  /* Size of cell in bytes */

      /* Read the current cell from the input and push it to the output. */
      fiCsrCell(&fcsr, &pCell, &nCell);
      rc = fiWriterAdd(&writer, pCell, nCell);
      if( rc==BT_BLOCKFULL ){
        rc = fiWriterFlushAll(&writer);
        fiWriterInit(db, &s, &writer, &rc);
      }
      if( rc==SQLITE4_OK ){
        rc = fiCsrStep(&fcsr);
      }
    }

    /* Assuming no error has occurred, update the serialized BtSchedule
    ** structure stored in buffer aSched[]. The caller will write this
    ** buffer to the database file as page (pHdr->iSRoot).  */
    if( rc==BT_BLOCKFULL || rc==SQLITE4_NOTFOUND ){
      rc = fiWriterFlushAll(&writer);
      if( rc==SQLITE4_OK ){
        btWriteSchedule(aSched, &s, &rc);
      }
    }

    fiWriterCleanup(&writer);
    fiCsrReset(&fcsr);
  }
  return rc;
}

/*
** Insert a new key/value pair or replace an existing one.

Changes to src/bt_pager.c

695
696
697
698
699
700
701






702
703
704
705
706
707
708
    if( iLevel==0 ){
      int rc2 = btCloseReadTransaction(p);
      if( rc==SQLITE4_OK ) rc = rc2;
    }
  }
  return rc;
}







int sqlite4BtPagerRollback(BtPager *p, int iLevel){
  int rc = SQLITE4_OK;

  assert( p->btl.pFd );
  if( p->iTransactionLevel>=iLevel ){








>
>
>
>
>
>







695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
    if( iLevel==0 ){
      int rc2 = btCloseReadTransaction(p);
      if( rc==SQLITE4_OK ) rc = rc2;
    }
  }
  return rc;
}

int sqlite4BtPagerRawWrite(BtPager *p, u32 pgno, u8 *aBuf){
  int pgsz = p->pHdr->pgsz;
  i64 iOff = (i64)pgsz * (i64)(pgno-1);
  return p->btl.pVfs->xWrite(p->btl.pFd, iOff, aBuf, pgsz);
}

int sqlite4BtPagerRollback(BtPager *p, int iLevel){
  int rc = SQLITE4_OK;

  assert( p->btl.pFd );
  if( p->iTransactionLevel>=iLevel ){