Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fixes for keys that spill over onto overflow pages.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: bca95ca536ce06b42b53df8d9cd84f110093289d
User & Date: dan 2013-10-11 18:07:09.893
Context
2013-10-11
20:04
Btree fixes so that the lsmtest "data*" tests work. check-in: b0fe8e5b45 user: dan tags: trunk
18:07
Fixes for keys that spill over onto overflow pages. check-in: bca95ca536 user: dan tags: trunk
2013-10-10
19:38
Fixes for overflow pages and very large values. check-in: 74e7bb267f user: dan tags: trunk
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/bt_main.c.
30
31
32
33
34
35
36








37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#endif

struct bt_db {
  sqlite4_env *pEnv;              /* SQLite environment */
  BtPager *pPager;                /* Underlying page-based database */
};









/*
** Database cursor handle.
*/
struct bt_cursor {
  bt_db *pDb;                     /* Database that owns this cursor */

  int nPg;                        /* Number of valid entries in apPage[] */
  int aiCell[BT_MAX_DEPTH];       /* Current cell of each apPage[] entry */
  BtPage *apPage[BT_MAX_DEPTH];   /* All pages from root to current leaf */

  u8 *pVal;                       /* Buffer used to cache large values. */
  int nVal;                       /* Size of buffer pVal in bytes */
};

#ifndef btErrorBkpt
int btErrorBkpt(int rc){
  static int error_cnt = 0;
  error_cnt++;
  return rc;







>
>
>
>
>
>
>
>










<
|







30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

55
56
57
58
59
60
61
62
#endif

struct bt_db {
  sqlite4_env *pEnv;              /* SQLite environment */
  BtPager *pPager;                /* Underlying page-based database */
};

typedef struct BtOvfl BtOvfl;
struct BtOvfl {
  int nKey;
  int nVal;
  u8 *pBuf;
  int nBuf;
};

/*
** Database cursor handle.
*/
struct bt_cursor {
  bt_db *pDb;                     /* Database that owns this cursor */

  int nPg;                        /* Number of valid entries in apPage[] */
  int aiCell[BT_MAX_DEPTH];       /* Current cell of each apPage[] entry */
  BtPage *apPage[BT_MAX_DEPTH];   /* All pages from root to current leaf */


  BtOvfl ovfl;                    /* Overflow cache (see above) */
};

#ifndef btErrorBkpt
int btErrorBkpt(int rc){
  static int error_cnt = 0;
  error_cnt++;
  return rc;
183
184
185
186
187
188
189
190
191
192
193
194

195
196
197
198
199
200
201
202
203
204
205
206
207
  }else{
    btCsrSetup(db, pCsr);
  }

  return rc;
}

static void btCsrReset(bt_cursor *pCsr){
  int i;
  for(i=0; i<pCsr->nPg; i++){
    sqlite4BtPageRelease(pCsr->apPage[i]);
  }

  pCsr->nPg = 0;
}

int sqlite4BtCsrClose(bt_cursor *pCsr){
  if( pCsr ){
    btCsrReset(pCsr);
    sqlite4_free(pCsr->pDb->pEnv, pCsr);
  }
  return SQLITE4_OK;
}

void *sqlite4BtCsrExtra(bt_cursor *pCsr){
  return (void*)&pCsr[1];







|




>





|







190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
  }else{
    btCsrSetup(db, pCsr);
  }

  return rc;
}

static void btCsrReset(bt_cursor *pCsr, int bFreeBuffer){
  int i;
  for(i=0; i<pCsr->nPg; i++){
    sqlite4BtPageRelease(pCsr->apPage[i]);
  }
  if( bFreeBuffer ) sqlite4_free(pCsr->pDb->pEnv, pCsr->ovfl.pBuf);
  pCsr->nPg = 0;
}

int sqlite4BtCsrClose(bt_cursor *pCsr){
  if( pCsr ){
    btCsrReset(pCsr, 1);
    sqlite4_free(pCsr->pDb->pEnv, pCsr);
  }
  return SQLITE4_OK;
}

void *sqlite4BtCsrExtra(bt_cursor *pCsr){
  return (void*)&pCsr[1];
224
225
226
227
228
229
230
231

232
233
234
235
236
237
238
239
}

/*
** Move the cursor from the current page to the parent. Return 
** SQLITE4_NOTFOUND if the cursor already points to the root page,
** or SQLITE4_OK otherwise.
*/
static int btCsrAscend(bt_cursor *pCsr){

  if( pCsr->nPg>0 ){
    pCsr->nPg--;
    sqlite4BtPageRelease(pCsr->apPage[pCsr->nPg]);
  }
  return (pCsr->nPg==0 ? SQLITE4_NOTFOUND : SQLITE4_OK);
}

/**************************************************************************







|
>
|







232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
}

/*
** Move the cursor from the current page to the parent. Return 
** SQLITE4_NOTFOUND if the cursor already points to the root page,
** or SQLITE4_OK otherwise.
*/
static int btCsrAscend(bt_cursor *pCsr, int nLvl){
  int i;
  for(i=0; i<nLvl && ( pCsr->nPg>0 ); i++){
    pCsr->nPg--;
    sqlite4BtPageRelease(pCsr->apPage[pCsr->nPg]);
  }
  return (pCsr->nPg==0 ? SQLITE4_NOTFOUND : SQLITE4_OK);
}

/**************************************************************************
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
}

static int btFreeContiguous(const u8 *aData, int nData){
  int nCell = btCellCount(aData, nData);
  return nData - btFreeOffset(aData, nData) - (3+nCell)*2;
}

static u8 btFlags(const u8 *aData, int nData){
  return aData[0];
}

static u8 *btCellFind(u8 *aData, int nData, int iCell){
  int iOff = btGetU16(&aData[nData - 6 - iCell*2 - 2]);
  return &aData[iOff];
}







|







264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
}

static int btFreeContiguous(const u8 *aData, int nData){
  int nCell = btCellCount(aData, nData);
  return nData - btFreeOffset(aData, nData) - (3+nCell)*2;
}

static u8 btFlags(const u8 *aData){
  return aData[0];
}

static u8 *btCellFind(u8 *aData, int nData, int iCell){
  int iOff = btGetU16(&aData[nData - 6 - iCell*2 - 2]);
  return &aData[iOff];
}
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#include <stdio.h>
static void printPage(FILE *f, u32 pgno, u8 *aData, int nData){
  int i;
  int nCell = (int)btCellCount(aData, nData);
  fprintf(f, "Page %d: ", pgno);
  fprintf(f, "nCell=%d ", nCell);
  fprintf(f, "iFree=%d ", (int)btFreeOffset(aData, nData));
  fprintf(f, "flags=%d ", (int)btFlags(aData, nData));
  if( btFlags(aData, nData) & BT_PGFLAGS_INTERNAL ){
    fprintf(f, "rchild=%d ", (int)btGetU32(&aData[1]));
  }
  fprintf(f, "cell-offsets=(");
  for(i=0; i<nCell; i++){
    u8 *ptr = btCellPtrFind(aData, nData, i);
    fprintf(f, "%s%d", i==0?"":" ", (int)btGetU16(ptr));
  }
  fprintf(f, ")\n");

  for(i=0; i<nCell; i++){
    int nKey;
    int j;
    u8 *pCell = btCellFind(aData, nData, i);
    fprintf(f, "  Key %d: ", i);
    pCell += sqlite4BtVarintGet32(pCell, &nKey);
    for(j=0; j<nKey; j++){
      fprintf(f, "%02X", (int)pCell[j]);
    }
    if( btFlags(aData, nData) & BT_PGFLAGS_INTERNAL ){
      fprintf(f, "  child=%d ", (int)btGetU32(&pCell[j]));
    }
    fprintf(f, "\n");
  }
}
int printPgdataToStderr(u32 pgno, u8 *aData, int nData){
  printPage(stderr, pgno, aData, nData);







|
|


















|







314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
#include <stdio.h>
static void printPage(FILE *f, u32 pgno, u8 *aData, int nData){
  int i;
  int nCell = (int)btCellCount(aData, nData);
  fprintf(f, "Page %d: ", pgno);
  fprintf(f, "nCell=%d ", nCell);
  fprintf(f, "iFree=%d ", (int)btFreeOffset(aData, nData));
  fprintf(f, "flags=%d ", (int)btFlags(aData));
  if( btFlags(aData) & BT_PGFLAGS_INTERNAL ){
    fprintf(f, "rchild=%d ", (int)btGetU32(&aData[1]));
  }
  fprintf(f, "cell-offsets=(");
  for(i=0; i<nCell; i++){
    u8 *ptr = btCellPtrFind(aData, nData, i);
    fprintf(f, "%s%d", i==0?"":" ", (int)btGetU16(ptr));
  }
  fprintf(f, ")\n");

  for(i=0; i<nCell; i++){
    int nKey;
    int j;
    u8 *pCell = btCellFind(aData, nData, i);
    fprintf(f, "  Key %d: ", i);
    pCell += sqlite4BtVarintGet32(pCell, &nKey);
    for(j=0; j<nKey; j++){
      fprintf(f, "%02X", (int)pCell[j]);
    }
    if( btFlags(aData) & BT_PGFLAGS_INTERNAL ){
      fprintf(f, "  child=%d ", (int)btGetU32(&pCell[j]));
    }
    fprintf(f, "\n");
  }
}
int printPgdataToStderr(u32 pgno, u8 *aData, int nData){
  printPage(stderr, pgno, aData, nData);
355
356
357
358
359
360
361

362
363
364
365
366
367
368
369
370
371
372
373
374

375
376
377
378
379
380
381
382



383
384
385
386
387
388
389
390
391
392
393
394




395
















































396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426

427
428
429
430
431
432
433
434
435
436

437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
** If the cell key is C, and the user key K, then this function sets:
**
**     *piRes = (C - K).
**
** In other workds, *piRes is +ve, zero or -ve if C is respectively larger, 
** equal to or smaller than K.
*/

static int btCellKeyCompare(
  bt_cursor *pCsr,                /* Cursor handle */
  const u8 *aData, int nData,     /* Page data (nData is the page size) */
  int iCell,                      /* Cell to compare key from */
  const u8 *pK, int nK,           /* Key to compare to cell key */
  int *piRes                      /* OUT: Result of comparison */
){
  BtPage *pLeaf = 0;              /* Leaf page reference to release */
  int rc = SQLITE4_OK;
  int nCellKey;                   /* Number of bytes in cell key */
  int res;
  u8 *pCell = btCellFind((u8*)aData, nData, iCell);
  int nCmp;


  pCell += sqlite4BtVarintGet32(pCell, &nCellKey);
  if( nCellKey==0 ){
    assert( 0 );
#if 0
    if( (btFlags(aData, nData) & BT_PGFLAGS_INTERNAL)==0 ){
      pCell += sqlite4BtVarintGet32(pCell, &nCellKey);
      assert( nCellKey>0 );



    }else{
      assert( 0 );
    }
#endif
  }

  nCmp = MIN(nCellKey, nK);
  res = memcmp(pCell, pK, nCmp);
  if( res==0 ){
    res = nCellKey - nK;
  }





  sqlite4BtPageRelease(pLeaf);
















































  *piRes = res;
  return rc;
}

static int btCsrSeek(
  bt_cursor *pCsr, 
  const void *pK,                 /* Key to seek for */
  int nK,                         /* Size of key pK in bytes */
  int eSeek,                      /* Seek mode (a BT_SEEK_XXX constant) */
  int bUpdate
){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  u32 pgno;                       /* Page number for next page to load */
  int rc = SQLITE4_OK;            /* Return Code */

  /* Reset the cursor */
  btCsrReset(pCsr);

  /* Figure out the root page number */
  assert( pCsr->nPg==0 );
  pgno = sqlite4BtPagerRootpgno(pCsr->pDb->pPager);

  while( rc==SQLITE4_OK && pgno ){
    /* Load page number pgno into the b-tree */
    rc = btCsrDescend(pCsr, pgno);
    if( rc==SQLITE4_OK ){
      int nCell;                  /* Number of cells on this page */
      int iHi;                    /* pK/nK is <= than cell iHi */
      int iLo;                    /* pK/nK is > than cell (iLo-1) */
      int res;                    /* Result of comparison */
      u8 *aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);


      iLo = 0;
      iHi = nCell = btCellCount(aData, pgsz);
      if( nCell==0 ){
        rc = SQLITE4_NOTFOUND;
        break;
      }

      while( iHi>iLo ){
        int iTst = (iHi+iLo)/2;   /* Cell to compare to pK/nK */

        rc = btCellKeyCompare(pCsr, aData, pgsz, iTst, pK, nK, &res);
        if( rc!=SQLITE4_OK || res==0 ){
          /* Cell iTst is EQUAL to pK/nK */
          iHi = iLo = iTst;
        }else if( res<0 ){
          /* Cell iTst is SMALLER than pK/nK */
          iLo = iTst+1;
        }else{
          /* Cell iTst is LARGER than pK/nK */
          iHi = iTst;
        }
      }
      if( rc!=SQLITE4_OK ) break;
      assert( iHi==iLo );
      pCsr->aiCell[pCsr->nPg-1] = iHi;

#if 0
printPage(stderr, pgno, aData, pgsz);
#endif

      if( aData[0] & BT_PGFLAGS_INTERNAL ){
        pgno = btChildPgno(aData, pgsz, iHi);
      }else{
        pgno = 0;

        if( res!=0 ){
          assert( BT_SEEK_LEFAST<0 && BT_SEEK_LE<0 );
          if( eSeek<0 ){







>













>



<
<

|
|
>
>
>



<








>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
















|














>










>
|



















|







364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388


389
390
391
392
393
394
395
396
397

398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
** If the cell key is C, and the user key K, then this function sets:
**
**     *piRes = (C - K).
**
** In other workds, *piRes is +ve, zero or -ve if C is respectively larger, 
** equal to or smaller than K.
*/
#if 0
static int btCellKeyCompare(
  bt_cursor *pCsr,                /* Cursor handle */
  const u8 *aData, int nData,     /* Page data (nData is the page size) */
  int iCell,                      /* Cell to compare key from */
  const u8 *pK, int nK,           /* Key to compare to cell key */
  int *piRes                      /* OUT: Result of comparison */
){
  BtPage *pLeaf = 0;              /* Leaf page reference to release */
  int rc = SQLITE4_OK;
  int nCellKey;                   /* Number of bytes in cell key */
  int res;
  u8 *pCell = btCellFind((u8*)aData, nData, iCell);
  int nCmp;
  int nAscend = 0;

  pCell += sqlite4BtVarintGet32(pCell, &nCellKey);
  if( nCellKey==0 ){


    if( (btFlags(aData, nData) & BT_PGFLAGS_INTERNAL)==0 ){
      rc = sqlite4BtCsrKey(pCsr, &pCell, &nCellKey);
    }else{
      rc = btOverflowBuffer(pCsr, 0);
      if( rc!=SQLITE4_OK ) return rc;
      pK = pCsr->ovfl.pBuf;
    }else{
      assert( 0 );
    }

  }

  nCmp = MIN(nCellKey, nK);
  res = memcmp(pCell, pK, nCmp);
  if( res==0 ){
    res = nCellKey - nK;
  }

  *piRes = res;
  return rc;
}
#endif

static int btCellKeyCompare(
  bt_cursor *pCsr, 
  int bLeaf, 
  const void *pK, 
  int nK, 
  int *piRes
){
  const void *pCsrKey;
  int nCsrKey;
  int nCmp;
  int nAscend = 0;
  int rc = SQLITE4_OK;
  int res;

  if( bLeaf ){
    rc = sqlite4BtCsrKey(pCsr, &pCsrKey, &nCsrKey);
  }else{
    const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);

    u8 *aData = sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
    u8 *pCell = btCellFind(aData, pgsz, pCsr->aiCell[pCsr->nPg-1]);

    pCsrKey = pCell + sqlite4BtVarintGet32(pCell, &nCsrKey);
    if( nCsrKey==0 ){
      while( 1 ){
        u8 *aData = sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
        u32 pgno = btChildPgno(aData, pgsz, pCsr->aiCell[pCsr->nPg-1]);
        nAscend++;
        rc = btCsrDescend(pCsr, pgno);
        if( rc!=SQLITE4_OK ) break;
        aData = sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
        pCsr->aiCell[pCsr->nPg-1] = btCellCount(aData, pgsz);
        if( (btFlags(aData) & BT_PGFLAGS_INTERNAL)==0 ) break;
      }
      pCsr->aiCell[pCsr->nPg-1]--;
      rc = sqlite4BtCsrKey(pCsr, &pCsrKey, &nCsrKey);
    }
  }

  if( rc==SQLITE4_OK ){
    nCmp = MIN(nCsrKey, nK);
    res = memcmp(pCsrKey, pK, nCmp);
    if( res==0 ){
      res = nCsrKey - nK;
    }
  }

  btCsrAscend(pCsr, nAscend);
  *piRes = res;
  return rc;
}

static int btCsrSeek(
  bt_cursor *pCsr, 
  const void *pK,                 /* Key to seek for */
  int nK,                         /* Size of key pK in bytes */
  int eSeek,                      /* Seek mode (a BT_SEEK_XXX constant) */
  int bUpdate
){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  u32 pgno;                       /* Page number for next page to load */
  int rc = SQLITE4_OK;            /* Return Code */

  /* Reset the cursor */
  btCsrReset(pCsr, 0);

  /* Figure out the root page number */
  assert( pCsr->nPg==0 );
  pgno = sqlite4BtPagerRootpgno(pCsr->pDb->pPager);

  while( rc==SQLITE4_OK && pgno ){
    /* Load page number pgno into the b-tree */
    rc = btCsrDescend(pCsr, pgno);
    if( rc==SQLITE4_OK ){
      int nCell;                  /* Number of cells on this page */
      int iHi;                    /* pK/nK is <= than cell iHi */
      int iLo;                    /* pK/nK is > than cell (iLo-1) */
      int res;                    /* Result of comparison */
      u8 *aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
      int bLeaf = ((btFlags(aData) & BT_PGFLAGS_INTERNAL)==0);

      iLo = 0;
      iHi = nCell = btCellCount(aData, pgsz);
      if( nCell==0 ){
        rc = SQLITE4_NOTFOUND;
        break;
      }

      while( iHi>iLo ){
        int iTst = (iHi+iLo)/2;   /* Cell to compare to pK/nK */
        pCsr->aiCell[pCsr->nPg-1] = iTst;
        rc = btCellKeyCompare(pCsr, bLeaf, pK, nK, &res);
        if( rc!=SQLITE4_OK || res==0 ){
          /* Cell iTst is EQUAL to pK/nK */
          iHi = iLo = iTst;
        }else if( res<0 ){
          /* Cell iTst is SMALLER than pK/nK */
          iLo = iTst+1;
        }else{
          /* Cell iTst is LARGER than pK/nK */
          iHi = iTst;
        }
      }
      if( rc!=SQLITE4_OK ) break;
      assert( iHi==iLo );
      pCsr->aiCell[pCsr->nPg-1] = iHi;

#if 0
printPage(stderr, pgno, aData, pgsz);
#endif

      if( bLeaf==0 ){
        pgno = btChildPgno(aData, pgsz, iHi);
      }else{
        pgno = 0;

        if( res!=0 ){
          assert( BT_SEEK_LEFAST<0 && BT_SEEK_LE<0 );
          if( eSeek<0 ){
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
*/
static int btCsrEnd(bt_cursor *pCsr, int bLast){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  int rc;                         /* Return Code */
  u32 pgno;                       /* Page number for next page to load */

  /* Reset the cursor */
  btCsrReset(pCsr);

  /* Figure out the root page number */
  assert( pCsr->nPg==0 );
  pgno = sqlite4BtPagerRootpgno(pCsr->pDb->pPager);

  while( rc==SQLITE4_OK ){
    /* Load page number pgno into the b-tree */







|







563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
*/
static int btCsrEnd(bt_cursor *pCsr, int bLast){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  int rc;                         /* Return Code */
  u32 pgno;                       /* Page number for next page to load */

  /* Reset the cursor */
  btCsrReset(pCsr, 0);

  /* Figure out the root page number */
  assert( pCsr->nPg==0 );
  pgno = sqlite4BtPagerRootpgno(pCsr->pDb->pPager);

  while( rc==SQLITE4_OK ){
    /* Load page number pgno into the b-tree */
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
    }else{
      if( pCsr->aiCell[iPg]>0 ){
        pCsr->aiCell[iPg]--;
        break;
      }
    }

    rc = btCsrAscend(pCsr);
    bRequireDescent = 1;
  }

  if( bRequireDescent && rc==SQLITE4_OK ){
    u32 pgno;                   /* Child page number */
    u8 *aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);

    pgno = btChildPgno(aData, pgsz, pCsr->aiCell[pCsr->nPg-1]);

    while( 1 ){
      rc = btCsrDescend(pCsr, pgno);
      if( rc!=SQLITE4_OK ){
        break;
      }else{
        int nCell;
        aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
        nCell = btCellCount(aData, pgsz);
        if( btFlags(aData, pgsz) & BT_PGFLAGS_INTERNAL ){
          pCsr->aiCell[pCsr->nPg-1] = (bNext ? 0 : nCell);
          pgno = btChildPgno(aData, pgsz, pCsr->aiCell[pCsr->nPg-1]);
        }else{
          pCsr->aiCell[pCsr->nPg-1] = (bNext ? 0 : nCell-1);
          break;
        }
      }







|

















|







640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
    }else{
      if( pCsr->aiCell[iPg]>0 ){
        pCsr->aiCell[iPg]--;
        break;
      }
    }

    rc = btCsrAscend(pCsr, 1);
    bRequireDescent = 1;
  }

  if( bRequireDescent && rc==SQLITE4_OK ){
    u32 pgno;                   /* Child page number */
    u8 *aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);

    pgno = btChildPgno(aData, pgsz, pCsr->aiCell[pCsr->nPg-1]);

    while( 1 ){
      rc = btCsrDescend(pCsr, pgno);
      if( rc!=SQLITE4_OK ){
        break;
      }else{
        int nCell;
        aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
        nCell = btCellCount(aData, pgsz);
        if( btFlags(aData) & BT_PGFLAGS_INTERNAL ){
          pCsr->aiCell[pCsr->nPg-1] = (bNext ? 0 : nCell);
          pgno = btChildPgno(aData, pgsz, pCsr->aiCell[pCsr->nPg-1]);
        }else{
          pCsr->aiCell[pCsr->nPg-1] = (bNext ? 0 : nCell-1);
          break;
        }
      }
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
/*
** Retreat to the previous entry in the tree.
*/
int sqlite4BtCsrPrev(bt_cursor *pCsr){
  return btCsrStep(pCsr, 0);
}

int sqlite4BtCsrKey(bt_cursor *pCsr, const void **ppK, int *pnK){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  u8 *aData;
  u8 *pCell;
  int iCell = pCsr->aiCell[pCsr->nPg-1];
  int nK;
  
  aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
  assert( btCellCount(aData, pgsz)>iCell );
  pCell = btCellFind(aData, pgsz, iCell);
  pCell += sqlite4BtVarintGet32(pCell, &nK);

  if( nK==0 ){
    assert( 0 );
  }else{
    *ppK = pCell;
    *pnK = nK;
  }

  return SQLITE4_OK;
}

static int btGrowBuffer(bt_db *db, int nReq, u8 **ppVal, int *pnVal){
  if( nReq>*pnVal ){
    u8 *pNew = sqlite4_malloc(db->pEnv, nReq*2);
    if( pNew==0 ) return btErrorBkpt(SQLITE4_NOMEM);
    sqlite4_free(db->pEnv, *ppVal);
    *ppVal = pNew;
    *pnVal = nReq*2;







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







687
688
689
690
691
692
693






















694
695
696
697
698
699
700
/*
** Retreat to the previous entry in the tree.
*/
int sqlite4BtCsrPrev(bt_cursor *pCsr){
  return btCsrStep(pCsr, 0);
}























static int btGrowBuffer(bt_db *db, int nReq, u8 **ppVal, int *pnVal){
  if( nReq>*pnVal ){
    u8 *pNew = sqlite4_malloc(db->pEnv, nReq*2);
    if( pNew==0 ) return btErrorBkpt(SQLITE4_NOMEM);
    sqlite4_free(db->pEnv, *ppVal);
    *ppVal = pNew;
    *pnVal = nReq*2;
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763


























































































764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801

802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823

824
825
826
827

828



829
830




831
832
833
834
835
836
837
838
839
840
841

842
843

844
845


846


847
848
849
850
851
852
853

      for(iLvl=nDepth-1; iLvl>=0; iLvl--){
        if( apHier[iLvl].iCell<(nPgPtr-1) ) break;
      }
      if( iLvl<0 ) break; /* SQLITE4_CORRUPT? */
      apHier[iLvl].iCell++;

      for(iLvl; iLvl<nDepth && rc==SQLITE4_OK; iLvl++){
        a = sqlite4BtPageData(apHier[iLvl].pPg);
        pgno = btGetU32(&a[apHier[iLvl].iCell * 4]);
        if( iLvl<(nDepth-1) ){
          apHier[iLvl+1].iCell = 0;
          sqlite4BtPageRelease(apHier[iLvl+1].pPg);
          apHier[iLvl+1].pPg = 0;
          rc = sqlite4BtPageGet(db->pPager, pgno, &apHier[iLvl+1].pPg);
        }
      }
    }

    for(i=0; i<nDepth && rc==SQLITE4_OK; i++){
      sqlite4BtPageRelease(apHier[i].pPg);
    }
  }

  return rc;
}



























































































int sqlite4BtCsrData(
  bt_cursor *pCsr,                /* Cursor handle */
  int iOffset,                    /* Offset of requested data */
  int nByte,                      /* Bytes requested (or -ve for all avail.) */
  const void **ppV,               /* OUT: Pointer to data buffer */
  int *pnV                        /* OUT: Size of data buffer in bytes */
){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  int rc = SQLITE4_OK;
  u8 *aData;
  u8 *pCell;
  int iCell = pCsr->aiCell[pCsr->nPg-1];
  int nK;
  int nV;

  aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
  pCell = btCellFind(aData, pgsz, iCell);
  pCell += sqlite4BtVarintGet32(pCell, &nK);
  assert( nK!=0 );
  pCell += nK;
  pCell += sqlite4BtVarintGet32(pCell, &nV);

  if( nV==0 ){
    int nLVal;
    int nOVal;
    int nVal;                     /* Total size of entry data */
    u8 *pOvfl;


    pCell += sqlite4BtVarintGet32(pCell, &nLVal);
    pOvfl = pCell + nLVal;
    pOvfl += sqlite4BtVarintGet32(pOvfl, &nOVal);
    nVal = nLVal + nOVal;

    if( btGrowBuffer(pCsr->pDb, nVal, &pCsr->pVal, &pCsr->nVal) ){
      return SQLITE4_NOMEM;
    }

    memcpy(pCsr->pVal, pCell, nLVal);

    rc = btOverflowArrayRead(pCsr->pDb, pOvfl, &pCsr->pVal[nLVal], nOVal);
    *ppV = pCsr->pVal;
    *pnV = nVal;
  }else{
    *ppV = pCell;
    *pnV = (nV-1);
  }

  return rc;
}

/*
** The argument points to a buffer containing an overflow array. Return
** the size of the overflow array in bytes. 
*/
static int btOverflowArrayLen(u8 *p){
  return 1 + ((int)(p[0] & 0x0F) + 1) * 4;
}

static u8 *btCellFindSize(u8 *aData, int nData, int iCell, int *pnByte){

  int nKey;
  u8 *pCell;
  u8 *p;


  p = pCell = btCellFind(aData, nData, iCell);



  p += sqlite4BtVarintGet32(p, &nKey);
  assert( nKey!=0 ); /* todo. type (c) leaf cell */




  p += nKey;
  if( 0==(aData[0] & BT_PGFLAGS_INTERNAL) ){
    p += sqlite4BtVarintGet32(p, &nKey);
    if( nKey==0 ){
      p += sqlite4BtVarintGet32(p, &nKey);
      p += nKey;
      p += sqlite4BtVarintGet32(p, &nKey);
      p += btOverflowArrayLen(p);
    }else{
      p += (nKey-1);
    }

  }else{
    p += 4;

  }



  *pnByte = (p - pCell);


  return pCell;
}

/*
** Allocate a new page buffer.
*/
static int btNewBuffer(bt_db *pDb, u8 **paBuf){







|


















>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>













|
|




|
|
|
|
<
<
<
<
<

|
<
<
<
<
|
<
|
<
>
|
|
<
<
<
















|
>

<
<

>
|
>
>
>
|
|
>
>
>
>
|
<









>
|
<
>
|

>
>
|
>
>







781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919





920
921




922

923

924
925
926



927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945


946
947
948
949
950
951
952
953
954
955
956
957
958

959
960
961
962
963
964
965
966
967
968
969

970
971
972
973
974
975
976
977
978
979
980
981
982
983
984

      for(iLvl=nDepth-1; iLvl>=0; iLvl--){
        if( apHier[iLvl].iCell<(nPgPtr-1) ) break;
      }
      if( iLvl<0 ) break; /* SQLITE4_CORRUPT? */
      apHier[iLvl].iCell++;

      for(; iLvl<nDepth && rc==SQLITE4_OK; iLvl++){
        a = sqlite4BtPageData(apHier[iLvl].pPg);
        pgno = btGetU32(&a[apHier[iLvl].iCell * 4]);
        if( iLvl<(nDepth-1) ){
          apHier[iLvl+1].iCell = 0;
          sqlite4BtPageRelease(apHier[iLvl+1].pPg);
          apHier[iLvl+1].pPg = 0;
          rc = sqlite4BtPageGet(db->pPager, pgno, &apHier[iLvl+1].pPg);
        }
      }
    }

    for(i=0; i<nDepth && rc==SQLITE4_OK; i++){
      sqlite4BtPageRelease(apHier[i].pPg);
    }
  }

  return rc;
}

/*
** If the current cell is a type (c) leaf cell, load the entire key
** into the pCsr->ovfl buffer. If bVal is true, then also load the
** entries value into the buffer.
*/
static int btOverflowBuffer(bt_cursor *pCsr, int bVal){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  int rc = SQLITE4_OK;
  u8 *aData;
  u8 *pCell;
  int iCell = pCsr->aiCell[pCsr->nPg-1];

  int nK;
  int nReq;
  u8 *pLocal;                     /* Pointer to local data within leaf page */
  u8 *aOut;                       /* Output buffer for overflow data */
  int nLocal = 0;
  int nOvfl1 = 0;
  int nOvfl2 = 0;

  aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
  pCell = btCellFind(aData, pgsz, iCell);
  pCell += sqlite4BtVarintGet32(pCell, &nK);
  if( nK==0 ){
    /* type (c) leaf cell */
    pCell += sqlite4BtVarintGet32(pCell, &nLocal);
    pLocal = pCell;
    pCell += nLocal;
    pCell += sqlite4BtVarintGet32(pCell, &nOvfl1);
    pCell += sqlite4BtVarintGet32(pCell, &nOvfl2);

    pCsr->ovfl.nKey = nLocal + nOvfl1;
    pCsr->ovfl.nVal = nOvfl2;
  }else{
    /* type (b) leaf cell */
    pCell += nK;
    assert( pCell[0]==0x00 );
    pCell++;

    pCell += sqlite4BtVarintGet32(pCell, &nLocal);
    pLocal = pCell;
    pCell += nLocal;
    pCell += sqlite4BtVarintGet32(pCell, &nOvfl1);

    pCsr->ovfl.nKey = 0;
    pCsr->ovfl.nVal = nLocal + nOvfl1;
  }

  nReq = nLocal + nOvfl1 + nOvfl2;
  rc = btGrowBuffer(pCsr->pDb, nReq, &pCsr->ovfl.pBuf, &pCsr->ovfl.nBuf);
  if( rc!=SQLITE4_OK ) return rc;

  /* Copy in local data */
  memcpy(pCsr->ovfl.pBuf, pLocal, nLocal);

  /* Load in overflow data */
  aOut = &pCsr->ovfl.pBuf[nLocal];
  rc = btOverflowArrayRead(pCsr->pDb, pCell, aOut, nOvfl1 + nOvfl2);

  return rc;
}

int sqlite4BtCsrKey(bt_cursor *pCsr, const void **ppK, int *pnK){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  u8 *aData;
  u8 *pCell;
  int iCell = pCsr->aiCell[pCsr->nPg-1];
  int nK;
  int rc = SQLITE4_OK;
  
  aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
  assert( btCellCount(aData, pgsz)>iCell );
  pCell = btCellFind(aData, pgsz, iCell);
  pCell += sqlite4BtVarintGet32(pCell, &nK);

  if( nK==0 ){
    /* type (c) leaf cell */
    rc = btOverflowBuffer(pCsr, 0);
    if( rc==SQLITE4_OK ){
      *ppK = pCsr->ovfl.pBuf;
      *pnK = pCsr->ovfl.nKey;
    }
  }else{
    *ppK = pCell;
    *pnK = nK;
  }

  return rc;
}

int sqlite4BtCsrData(
  bt_cursor *pCsr,                /* Cursor handle */
  int iOffset,                    /* Offset of requested data */
  int nByte,                      /* Bytes requested (or -ve for all avail.) */
  const void **ppV,               /* OUT: Pointer to data buffer */
  int *pnV                        /* OUT: Size of data buffer in bytes */
){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  int rc = SQLITE4_OK;
  u8 *aData;
  u8 *pCell;
  int iCell = pCsr->aiCell[pCsr->nPg-1];
  int nK = 0;
  int nV = 0;

  aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
  pCell = btCellFind(aData, pgsz, iCell);
  pCell += sqlite4BtVarintGet32(pCell, &nK);
  if( nK>0 ){
    pCell += nK;
    pCell += sqlite4BtVarintGet32(pCell, &nV);
  }






  if( nV==0 ){




    rc = btOverflowBuffer(pCsr, 1);

    if( rc==SQLITE4_OK ){

      *ppV = &pCsr->ovfl.pBuf[pCsr->ovfl.nKey];
      *pnV = pCsr->ovfl.nVal;
    }



  }else{
    *ppV = pCell;
    *pnV = (nV-1);
  }

  return rc;
}

/*
** The argument points to a buffer containing an overflow array. Return
** the size of the overflow array in bytes. 
*/
static int btOverflowArrayLen(u8 *p){
  return 1 + ((int)(p[0] & 0x0F) + 1) * 4;
}

static int btCellSize(u8 *pCell, int bLeaf){
  u8 *p = pCell;
  int nKey;



  p += sqlite4BtVarintGet32(p, &nKey);
  if( bLeaf==0 ){
    p += nKey;
    p += 4;
  }else if( nKey==0 ){
    p += sqlite4BtVarintGet32(p, &nKey);
    p += nKey;
    p += sqlite4BtVarintGet32(p, &nKey);
    p += sqlite4BtVarintGet32(p, &nKey);
    p += btOverflowArrayLen(p);
  }else{
    p += nKey;

    p += sqlite4BtVarintGet32(p, &nKey);
    if( nKey==0 ){
      p += sqlite4BtVarintGet32(p, &nKey);
      p += nKey;
      p += sqlite4BtVarintGet32(p, &nKey);
      p += btOverflowArrayLen(p);
    }else{
      p += (nKey-1);
    }
  }


  return (p-pCell);
}

static u8 *btCellFindSize(u8 *aData, int nData, int iCell, int *pnByte){
  u8 *pCell;

  pCell = btCellFind(aData, nData, iCell);
  *pnByte = btCellSize(pCell, 0==(btFlags(aData) & BT_PGFLAGS_INTERNAL));
  return pCell;
}

/*
** Allocate a new page buffer.
*/
static int btNewBuffer(bt_db *pDb, u8 **paBuf){
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
  int nHdr;                       /* Bytes in header of this page */

  if( btNewBuffer(pDb, &aTmp) ) return SQLITE4_NOMEM;

  aData = sqlite4BtPageData(pPg);
  nCell = btCellCount(aData, pgsz);

  bLeaf = 0==(btFlags(aData, pgsz) & BT_PGFLAGS_INTERNAL);
  nHdr = bLeaf ? 1 : 5;

  /* Set header bytes of new page */
  memcpy(aTmp, aData, nHdr);

  iWrite = nHdr;
  for(i=0; i<nCell; i++){
    int nByte;
    u8 *pCell;
    pCell = btCellFindSize(aData, pgsz, i, &nByte);

    btPutU16(btCellPtrFind(aTmp, pgsz, i), iWrite);
    memcpy(&aTmp[iWrite], pCell, nByte);
    iWrite += nByte;
  }


  /* Write the rest of the page footer */
  btPutU16(&aTmp[pgsz-2], nCell);
  btPutU16(&aTmp[pgsz-4], pgsz - (3+nCell)*2 - iWrite);
  btPutU16(&aTmp[pgsz-6], iWrite);

  btSetBuffer(pDb, pPg, aTmp);







|















<







1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056

1057
1058
1059
1060
1061
1062
1063
  int nHdr;                       /* Bytes in header of this page */

  if( btNewBuffer(pDb, &aTmp) ) return SQLITE4_NOMEM;

  aData = sqlite4BtPageData(pPg);
  nCell = btCellCount(aData, pgsz);

  bLeaf = 0==(btFlags(aData) & BT_PGFLAGS_INTERNAL);
  nHdr = bLeaf ? 1 : 5;

  /* Set header bytes of new page */
  memcpy(aTmp, aData, nHdr);

  iWrite = nHdr;
  for(i=0; i<nCell; i++){
    int nByte;
    u8 *pCell;
    pCell = btCellFindSize(aData, pgsz, i, &nByte);

    btPutU16(btCellPtrFind(aTmp, pgsz, i), iWrite);
    memcpy(&aTmp[iWrite], pCell, nByte);
    iWrite += nByte;
  }


  /* Write the rest of the page footer */
  btPutU16(&aTmp[pgsz-2], nCell);
  btPutU16(&aTmp[pgsz-4], pgsz - (3+nCell)*2 - iWrite);
  btPutU16(&aTmp[pgsz-6], iWrite);

  btSetBuffer(pDb, pPg, aTmp);
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162

1163
1164
1165
1166
1167
1168
1169
** Argument pKV contains a key/value pair destined for a leaf page in
** a database with page size pgsz. Currently it is in KV_VALUE form.
** If the key/value pair is too large to fit entirely within a leaf
** page, this function allocates and writes the required overflow
** pages to the database, and converts pKV to a KV_CELL cell (that
** contains the overflow array).
*/
static int btAssignOverflow(bt_db *db, KeyValue *pKV){
  const int pgsz = sqlite4BtPagerPagesize(db->pPager);
  int nMaxSize = (pgsz - 1 - 6 - 2);
  int nReq;
  int rc = SQLITE4_OK;

  assert( pKV->pgno==0 && pKV->eType==KV_VALUE );

  /* Check if this is a type (a) cell - one that can fit entirely on a 
  ** leaf page. If so, do nothing.  */
  nReq = sqlite4BtVarintLen32(pKV->nK) + sqlite4BtVarintLen32(pKV->nV);
  nReq += pKV->nK + pKV->nV;
  if( nReq > nMaxSize ){
    u8 *pBuf = 0;                 /* Buffer containing formatted cell */

    int nArraySz = btOverflowArraySz(pgsz, pKV->nK + pKV->nV);

    int nKeyOvfl;                 /* Bytes of key that overflow */
    int nValOvfl;                 /* Bytes of value that overflow */

    /* Check if the entire key can fit on a leaf page. If so, this is a
    ** type (b) page - entire key and partial value on the leaf page, 
    ** overflow pages contain the rest of the value.  */
    nReq = 1 + sqlite4BtVarintLen32(pKV->nK) + pKV->nK 







|












<
<

>







1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289


1290
1291
1292
1293
1294
1295
1296
1297
1298
** Argument pKV contains a key/value pair destined for a leaf page in
** a database with page size pgsz. Currently it is in KV_VALUE form.
** If the key/value pair is too large to fit entirely within a leaf
** page, this function allocates and writes the required overflow
** pages to the database, and converts pKV to a KV_CELL cell (that
** contains the overflow array).
*/
static int btOverflowAssign(bt_db *db, KeyValue *pKV){
  const int pgsz = sqlite4BtPagerPagesize(db->pPager);
  int nMaxSize = (pgsz - 1 - 6 - 2);
  int nReq;
  int rc = SQLITE4_OK;

  assert( pKV->pgno==0 && pKV->eType==KV_VALUE );

  /* Check if this is a type (a) cell - one that can fit entirely on a 
  ** leaf page. If so, do nothing.  */
  nReq = sqlite4BtVarintLen32(pKV->nK) + sqlite4BtVarintLen32(pKV->nV);
  nReq += pKV->nK + pKV->nV;
  if( nReq > nMaxSize ){


    int nArraySz = btOverflowArraySz(pgsz, pKV->nK + pKV->nV);
    u8 *pBuf = 0;                 /* Buffer containing formatted cell */
    int nKeyOvfl;                 /* Bytes of key that overflow */
    int nValOvfl;                 /* Bytes of value that overflow */

    /* Check if the entire key can fit on a leaf page. If so, this is a
    ** type (b) page - entire key and partial value on the leaf page, 
    ** overflow pages contain the rest of the value.  */
    nReq = 1 + sqlite4BtVarintLen32(pKV->nK) + pKV->nK 
1181
1182
1183
1184
1185
1186
1187






1188


1189
1190




1191

1192
1193

1194
1195



1196
1197
1198
1199
1200
1201
1202
1203



1204
1205
1206

1207
1208
1209
1210
1211
1212
1213
1214
1215

1216
1217
1218
1219
1220
1221
1222
          - sqlite4BtVarintLen32(pKV->nK) - pKV->nK - 1 - nArraySz
      );
      nLVal = nSpc - sqlite4BtVarintLen32(pgsz) - sqlite4BtVarintLen32(pKV->nV);
      nKeyOvfl = 0;
      nValOvfl = pKV->nV - nLVal;
    }else{
      /* Type (c) cell. Both the key and value overflow. */






      assert( 0 );


    }





    rc = btNewBuffer(db, &pBuf);

    if( rc==SQLITE4_OK ){
      int nLVal = (pKV->nV - nValOvfl);

      u8 *pOut = pBuf;




      pOut += sqlite4BtVarintPut32(pOut, pKV->nK);
      memcpy(pOut, pKV->pK, pKV->nK);
      pOut += pKV->nK;

      *pOut++ = 0x00;
      pOut += sqlite4BtVarintPut32(pOut, nLVal);
      memcpy(pOut, pKV->pV, nLVal);
      pOut += nLVal;



      pOut += sqlite4BtVarintPut32(pOut, nValOvfl);

      rc = btOverflowArrayPopulate(db, &pOut, 0, 0, 

          (u8*)(pKV->pV) + (pKV->nV - nValOvfl), nValOvfl
      );
      if( rc==SQLITE4_OK ){
        memset(pKV, 0, sizeof(*pKV));
        pKV->pV = pBuf;
        pKV->nV = pOut - pBuf;
        pKV->eType = KV_CELL;
        pBuf = 0;
        assert( pKV->nV<=nMaxSize );

      }
    }

    if( pBuf ){
      btFreeBuffer(db, pBuf);
    }
  }







>
>
>
>
>
>
|
>
>


>
>
>
>

>


>


>
>
>
|
|
|
|
|
|
|
|
>
>
>


|
>
|








>







1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
          - sqlite4BtVarintLen32(pKV->nK) - pKV->nK - 1 - nArraySz
      );
      nLVal = nSpc - sqlite4BtVarintLen32(pgsz) - sqlite4BtVarintLen32(pKV->nV);
      nKeyOvfl = 0;
      nValOvfl = pKV->nV - nLVal;
    }else{
      /* Type (c) cell. Both the key and value overflow. */
      int nLKey = nMaxSize 
          - 1                                    /* 0x00 byte */
          - sqlite4BtVarintLen32(pgsz)           /* nLKey */
          - sqlite4BtVarintLen32(pKV->nK)        /* nOKey */
          - sqlite4BtVarintLen32(pKV->nV)        /* nVal */
          - nArraySz;                            /* overflow array */

      nValOvfl = pKV->nV;
      nKeyOvfl = pKV->nK - nLKey;
    }

    /* Allocate a pager buffer to store the KV_CELL buffer. Using a pager
    ** buffer is convenient here as (a) it is roughly the right size and
    ** (b) can probably be allocated/deallocated faster than a regular
    ** heap allocation.  */
    rc = btNewBuffer(db, &pBuf);

    if( rc==SQLITE4_OK ){
      int nLVal = (pKV->nV - nValOvfl);
      int nLKey = (pKV->nK - nKeyOvfl);
      u8 *pOut = pBuf;

      if( nKeyOvfl>0 ){
        *pOut++ = 0x00;
      }
      pOut += sqlite4BtVarintPut32(pOut, nLKey);
      memcpy(pOut, pKV->pK, nLKey);
      pOut += nLKey;
      if( nKeyOvfl==0 ){
        *pOut++ = 0x00;
        pOut += sqlite4BtVarintPut32(pOut, nLVal);
        memcpy(pOut, pKV->pV, nLVal);
        pOut += nLVal;
      }else{
        pOut += sqlite4BtVarintPut32(pOut, nKeyOvfl);
      }
      pOut += sqlite4BtVarintPut32(pOut, nValOvfl);

      rc = btOverflowArrayPopulate(db, &pOut,
          (u8*)(pKV->pK) + nLKey, nKeyOvfl,
          (u8*)(pKV->pV) + nLVal, nValOvfl
      );
      if( rc==SQLITE4_OK ){
        memset(pKV, 0, sizeof(*pKV));
        pKV->pV = pBuf;
        pKV->nV = pOut - pBuf;
        pKV->eType = KV_CELL;
        pBuf = 0;
        assert( pKV->nV<=nMaxSize );
        assert( pKV->nV==btCellSize((u8*)pKV->pV, 1) );
      }
    }

    if( pBuf ){
      btFreeBuffer(db, pBuf);
    }
  }
1675
1676
1677
1678
1679
1680
1681



1682

1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
#endif

  /* The leaves are written. Now gather the keys and page numbers to
  ** push up into the parent page. This is only required when rebalancing
  ** b-tree leaves. When internal nodes are balanced, the btBalanceOutput
  ** loop accumulates the cells destined for the parent page.  */
  for(iPg=0; iPg<(ctx.nOut-1); iPg++){



    if( bLeaf ){

      u8 *aData = sqlite4BtPageData(ctx.apPg[iPg]);
      u8 *pCell;

      pCell = btCellFind(aData, pgsz, btCellCount(aData, pgsz)-1);
      ctx.aPCell[iPg].pgno = sqlite4BtPagePgno(ctx.apPg[iPg]);
      pCell += sqlite4BtVarintGet32(pCell, &ctx.aPCell[iPg].nK);
      ctx.aPCell[iPg].pK = pCell;
    }else{
      ctx.aPCell[iPg].pgno = sqlite4BtPagePgno(ctx.apPg[iPg]);
    }
  }

  rc = btSetChildPgno(
      pDb, pPar, iSib+ctx.nIn-1, sqlite4BtPagePgno(ctx.apPg[ctx.nOut-1])
  );
  if( rc==SQLITE4_OK ){
    btCsrAscend(pCsr);
    rc = btDeleteFromPage(pCsr, ctx.nIn-1);
  }
  iPg = pCsr->nPg;
  if( rc==SQLITE4_OK && ctx.nOut>1 ){
    rc = btInsertAndBalance(pCsr, ctx.nOut-1, ctx.aPCell);
  }
  if( rc==SQLITE4_OK && iPg==pCsr->nPg ){







>
>
>

>


<

<


|
<







|







1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839

1840

1841
1842
1843

1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
#endif

  /* The leaves are written. Now gather the keys and page numbers to
  ** push up into the parent page. This is only required when rebalancing
  ** b-tree leaves. When internal nodes are balanced, the btBalanceOutput
  ** loop accumulates the cells destined for the parent page.  */
  for(iPg=0; iPg<(ctx.nOut-1); iPg++){
    ctx.aPCell[iPg].pgno = sqlite4BtPagePgno(ctx.apPg[iPg]);
    assert( ctx.aPCell[iPg].nK==0 );

    if( bLeaf ){
#if 0
      u8 *aData = sqlite4BtPageData(ctx.apPg[iPg]);
      u8 *pCell;

      pCell = btCellFind(aData, pgsz, btCellCount(aData, pgsz)-1);

      pCell += sqlite4BtVarintGet32(pCell, &ctx.aPCell[iPg].nK);
      ctx.aPCell[iPg].pK = pCell;
#endif

    }
  }

  rc = btSetChildPgno(
      pDb, pPar, iSib+ctx.nIn-1, sqlite4BtPagePgno(ctx.apPg[ctx.nOut-1])
  );
  if( rc==SQLITE4_OK ){
    btCsrAscend(pCsr, 1);
    rc = btDeleteFromPage(pCsr, ctx.nIn-1);
  }
  iPg = pCsr->nPg;
  if( rc==SQLITE4_OK && ctx.nOut>1 ){
    rc = btInsertAndBalance(pCsr, ctx.nOut-1, ctx.aPCell);
  }
  if( rc==SQLITE4_OK && iPg==pCsr->nPg ){
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
  aData = (u8*)sqlite4BtPageData(pLeaf);

  /* Set the bLeaf variable to true if inserting into a leaf page, or
  ** false otherwise. Return SQLITE4_CORRUPT if the page is a leaf but
  ** the KeyValue pairs being inserted are suitable for internal nodes,
  ** or vice-versa.  */
  assert( nKV>0 );
  if( (0==(btFlags(aData, pgsz) & BT_PGFLAGS_INTERNAL))!=bLeaf ){
    return btErrorBkpt(SQLITE4_CORRUPT);
  }

  nCell = btCellCount(aData, pgsz);
  assert( iCell<=btCellCount(aData, pgsz) );

  if( nCell==0 ){







|







1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
  aData = (u8*)sqlite4BtPageData(pLeaf);

  /* Set the bLeaf variable to true if inserting into a leaf page, or
  ** false otherwise. Return SQLITE4_CORRUPT if the page is a leaf but
  ** the KeyValue pairs being inserted are suitable for internal nodes,
  ** or vice-versa.  */
  assert( nKV>0 );
  if( (0==(btFlags(aData) & BT_PGFLAGS_INTERNAL))!=bLeaf ){
    return btErrorBkpt(SQLITE4_CORRUPT);
  }

  nCell = btCellCount(aData, pgsz);
  assert( iCell<=btCellCount(aData, pgsz) );

  if( nCell==0 ){
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  int rc = SQLITE4_OK;
  int iPg = pCsr->nPg-1;
  BtPage *pPg = pCsr->apPage[iPg];
  u8 *aData = sqlite4BtPageData(pPg);
  int nCell = btCellCount(aData, pgsz);
  int nFree = btFreeSpace(aData, pgsz);
  int bLeaf = (0==(btFlags(aData, pgsz) & BT_PGFLAGS_INTERNAL));

  if( iPg==0 ){
    /* Root page. If it contains no cells at all and is not already
    ** a leaf, shorten the tree by one here by copying the contents 
    ** of the only child into the root. */
    if( nCell==0 && bLeaf==0 ){
      BtPager *pPager = pCsr->pDb->pPager;







|







2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  int rc = SQLITE4_OK;
  int iPg = pCsr->nPg-1;
  BtPage *pPg = pCsr->apPage[iPg];
  u8 *aData = sqlite4BtPageData(pPg);
  int nCell = btCellCount(aData, pgsz);
  int nFree = btFreeSpace(aData, pgsz);
  int bLeaf = (0==(btFlags(aData) & BT_PGFLAGS_INTERNAL));

  if( iPg==0 ){
    /* Root page. If it contains no cells at all and is not already
    ** a leaf, shorten the tree by one here by copying the contents 
    ** of the only child into the root. */
    if( nCell==0 && bLeaf==0 ){
      BtPager *pPager = pCsr->pDb->pPager;
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993


1994


1995
1996
1997
1998
1999
2000
2001
    /* Insert the new KV pair into the current leaf. */
    KeyValue kv;
    kv.pgno = 0;
    kv.eType = KV_VALUE;
    kv.pK = pK; kv.nK = nK;
    kv.pV = pV; kv.nV = nV;

    rc = btAssignOverflow(db, &kv);
    if( rc==SQLITE4_OK ){
      rc = btInsertAndBalance(&csr, 1, &kv);
    }


  }



  return rc;
}

int sqlite4BtDelete(bt_cursor *pCsr){
  int rc;
  rc =  btDeleteFromPage(pCsr, 1);







|



>
>
|
>
>







2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
    /* Insert the new KV pair into the current leaf. */
    KeyValue kv;
    kv.pgno = 0;
    kv.eType = KV_VALUE;
    kv.pK = pK; kv.nK = nK;
    kv.pV = pV; kv.nV = nV;

    rc = btOverflowAssign(db, &kv);
    if( rc==SQLITE4_OK ){
      rc = btInsertAndBalance(&csr, 1, &kv);
    }
    if( kv.eType==KV_CELL ){
      sqlite4_free(db->pEnv, (void*)kv.pV);
    }
  }
  btCsrReset(&csr, 1);

  return rc;
}

int sqlite4BtDelete(bt_cursor *pCsr){
  int rc;
  rc =  btDeleteFromPage(pCsr, 1);
Changes to src/bt_pager.c.
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
/*
** Read, write and trim existing database pages.
*/
int sqlite4BtPageGet(BtPager *p, u32 pgno, BtPage **ppPg){
  int rc = SQLITE4_OK;            /* Return code */
  BtPage *pRet;                   /* Returned page handle */

  if( pgno>100000 ){
    return btErrorBkpt(SQLITE4_CORRUPT);
  }

  /* Search the cache for an existing page. */
  pRet = btHashSearch(p, pgno);

  /* If the page is not in the cache, load it from disk */
  if( pRet==0 ){
    rc = btAllocatePage(p, &pRet);
    if( rc==SQLITE4_OK ){







<
<
<
<







456
457
458
459
460
461
462




463
464
465
466
467
468
469
/*
** Read, write and trim existing database pages.
*/
int sqlite4BtPageGet(BtPager *p, u32 pgno, BtPage **ppPg){
  int rc = SQLITE4_OK;            /* Return code */
  BtPage *pRet;                   /* Returned page handle */





  /* Search the cache for an existing page. */
  pRet = btHashSearch(p, pgno);

  /* If the page is not in the cache, load it from disk */
  if( pRet==0 ){
    rc = btAllocatePage(p, &pRet);
    if( rc==SQLITE4_OK ){
Changes to test/simple3.test.
12
13
14
15
16
17
18


































19
20
21
22
23
24
25
#
set testdir [file dirname $argv0]
source $testdir/tester.tcl
set testprefix simple3

db close
forcedelete test.db



































do_test 1.0 {
  sqlite4 db file:test.db?kv=bt
  db close
} {}

do_test 1.1 { sqlite4 db file:test.db?kv=bt } {}







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#
set testdir [file dirname $argv0]
source $testdir/tester.tcl
set testprefix simple3

db close
forcedelete test.db

proc bigstr {n} {
  set nRep [expr 1+($n/20)]
  string range [string repeat "abcdefghijklmnopqrstuvwxyz" $nRep] 0 [expr $n-1]
}

if 0 {
sqlite4 db file:test.db?kv=bt
foreach {tn nStr} {
  1 3000
  2 30000
  3 300000
  4 3000000
  5 30000000
} {
  set big [bigstr $nStr]
  do_execsql_test 6.$tn.1 {
    DROP TABLE IF EXISTS t6;
    CREATE TABLE t6(a PRIMARY KEY, b VALUE);
    INSERT INTO t6 VALUES($big, '123');
  }

  do_execsql_test 6.$tn.2 {
    SELECT length(a) FROM t6;
  } $nStr

  do_execsql_test 6.$tn.3 {
    SELECT a FROM t6;
  } [list $big]
}

finish_test
}


do_test 1.0 {
  sqlite4 db file:test.db?kv=bt
  db close
} {}

do_test 1.1 { sqlite4 db file:test.db?kv=bt } {}
184
185
186
187
188
189
190

191
192
193
194
195
196
























197
198
    CREATE TABLE t5(a PRIMARY KEY, b VALUE);
    INSERT INTO t5 VALUES(1, $big);
  }

  do_execsql_test 5.$tn.2 {
    SELECT length(b) FROM t5;
  } $nStr


  do_execsql_test 5.$tn.3 {
    SELECT b FROM t5;
  } [list $big]
}

























finish_test








>






>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
    CREATE TABLE t5(a PRIMARY KEY, b VALUE);
    INSERT INTO t5 VALUES(1, $big);
  }

  do_execsql_test 5.$tn.2 {
    SELECT length(b) FROM t5;
  } $nStr
  if {[set_test_counter errors]} break

  do_execsql_test 5.$tn.3 {
    SELECT b FROM t5;
  } [list $big]
}

foreach {tn nStr} {
  1 3000
  2 30000
  3 300000
  4 3000000
  5 30000000
} {
  set big [bigstr $nStr]
  do_execsql_test 6.$tn.1 {
    DROP TABLE IF EXISTS t6;
    CREATE TABLE t6(a PRIMARY KEY, b VALUE);
    INSERT INTO t6 VALUES($big, '123');
  }

  do_execsql_test 6.$tn.2 {
    SELECT length(a) FROM t6;
  } $nStr
  if {[set_test_counter errors]} break

  do_execsql_test 6.$tn.3 {
    SELECT a FROM t6;
  } [list $big]
}

finish_test

Changes to www/bt.wiki.
456
457
458
459
460
461
462
463
464
465
466
467
468




469
470
471
472
473
474
475
is instead read dynamically from the leaf page containing the smallest keys
larger than or equal to the omitted key-prefix. This leaf may be found by
following the existing tree pointers. N is selected so as to guarantee a
minimum internal fanout of (say) 4 or 8.

<p>Any real key/value pair that is too large for a single leaf page is
spread across the leaf and one or more overflow pages. Overflow pages are
organized differently to those in SQLite with two goals in mind:

<ul>
  <li> To make random access to offsets within large values more efficient, and
  <li> To make it possible to append data to an existing value without
       rewriting all constituent pages.




</ul>

<i>
  <p> Inode-style overflow pages (instead of a linked list)?
</i>

<i>







|





>
>
>
>







456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
is instead read dynamically from the leaf page containing the smallest keys
larger than or equal to the omitted key-prefix. This leaf may be found by
following the existing tree pointers. N is selected so as to guarantee a
minimum internal fanout of (say) 4 or 8.

<p>Any real key/value pair that is too large for a single leaf page is
spread across the leaf and one or more overflow pages. Overflow pages are
organized differently to those in SQLite with three goals in mind:

<ul>
  <li> To make random access to offsets within large values more efficient, and
  <li> To make it possible to append data to an existing value without
       rewriting all constituent pages.
  <li> To make it possible to store large objects contiguously within the
       database file. So that if the file is memory mapped, a pointer to
       the object can be returned to the user without copying it into
       heap memory.
</ul>

<i>
  <p> Inode-style overflow pages (instead of a linked list)?
</i>

<i>