/ Check-in [cf6da4a5]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix harmless compiler warnings seen with MSVC for lsm1.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256:cf6da4a52f7f9047e653ef2972e4c0910b29d7182d789a9e30225dc1849e8779
User & Date: mistachkin 2017-07-11 16:36:10
Context
2017-07-11
16:46
Fix more harmless compiler warnings in lsm1, seen with MSVC 2015. check-in: 0f1307f1 user: mistachkin tags: trunk
16:36
Fix harmless compiler warnings seen with MSVC for lsm1. check-in: cf6da4a5 user: mistachkin tags: trunk
13:59
Add support for tab-completion (using the ext/misc/completion.c virtual table) to the command-line shell. check-in: 95cd1d9f user: drh tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to ext/lsm1/Makefile.msc.

1
2
3
4
5
6
7
8
9
10
11
12
13
..
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
..
92
93
94
95
96
97
98
99
100
#
# This Makefile is designed for use with main.mk in the root directory of
# this project. After including main.mk, the users makefile should contain:
#
#    LSMDIR=$(TOP)\ext\lsm1\
#    include $(LSMDIR)\Makefile.msc
#
# The most useful targets are [lsmtest.exe] and [lsm.dll].
#

LSMOBJ    = \
  lsm_ckpt.lo \
  lsm_file.lo \
................................................................................
             $(LSMDIR)\lsm-test\lsmtest9.c                                   \
             $(LSMDIR)\lsm-test\lsmtest_datasource.c \
             $(LSMDIR)\lsm-test\lsmtest_func.c $(LSMDIR)\lsm-test\lsmtest_io.c  \
             $(LSMDIR)\lsm-test\lsmtest_main.c $(LSMDIR)\lsm-test\lsmtest_mem.c \
             $(LSMDIR)\lsm-test\lsmtest_tdb.c $(LSMDIR)\lsm-test\lsmtest_tdb3.c \
             $(LSMDIR)\lsm-test\lsmtest_util.c $(LSMDIR)\lsm-test\lsmtest_win32.c

# all: lsm.dll

LSMOPTS = $(NO_WARN) -DLSM_MUTEX_WIN32=1 -I$(LSMDIR)

!IF $(DEBUG)>2
LSMOPTS = $(LSMOPTS) -DLSM_DEBUG=1
!ENDIF

................................................................................

lsm_vtab.lo:	$(LSMDIR)\lsm_vtab.c $(LSMHDR) $(SQLITE3H)
	$(LTCOMPILE) $(LSMOPTS) -c $(LSMDIR)\lsm_vtab.c

lsm.dll:	$(LSMOBJ)
	$(LD) $(LDFLAGS) $(LTLINKOPTS) $(LTLIBPATHS) /DLL /OUT:$@ $(LSMOBJ)

lsmtest.exe: $(LSMOBJ) $(LSMTESTSRC) $(LSMTESTHDR) $(LIBOBJS1)
	$(LTLINK) $(LSMOPTS) $(LSMTESTSRC) /link $(LSMOBJ) $(LIBOBJS1)

|
|

|
|







 







|







 







|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
..
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
..
92
93
94
95
96
97
98
99
100
#
# This Makefile is designed for use with Makefile.msc in the root directory
# of this project.  The Makefile.msc should contain:
#
#    LSMDIR=$(TOP)\ext\lsm1
#    !INCLUDE $(LSMDIR)\Makefile.msc
#
# The most useful targets are [lsmtest.exe] and [lsm.dll].
#

LSMOBJ    = \
  lsm_ckpt.lo \
  lsm_file.lo \
................................................................................
             $(LSMDIR)\lsm-test\lsmtest9.c                                   \
             $(LSMDIR)\lsm-test\lsmtest_datasource.c \
             $(LSMDIR)\lsm-test\lsmtest_func.c $(LSMDIR)\lsm-test\lsmtest_io.c  \
             $(LSMDIR)\lsm-test\lsmtest_main.c $(LSMDIR)\lsm-test\lsmtest_mem.c \
             $(LSMDIR)\lsm-test\lsmtest_tdb.c $(LSMDIR)\lsm-test\lsmtest_tdb3.c \
             $(LSMDIR)\lsm-test\lsmtest_util.c $(LSMDIR)\lsm-test\lsmtest_win32.c

# all: lsm.dll lsmtest.exe

LSMOPTS = $(NO_WARN) -DLSM_MUTEX_WIN32=1 -I$(LSMDIR)

!IF $(DEBUG)>2
LSMOPTS = $(LSMOPTS) -DLSM_DEBUG=1
!ENDIF

................................................................................

lsm_vtab.lo:	$(LSMDIR)\lsm_vtab.c $(LSMHDR) $(SQLITE3H)
	$(LTCOMPILE) $(LSMOPTS) -c $(LSMDIR)\lsm_vtab.c

lsm.dll:	$(LSMOBJ)
	$(LD) $(LDFLAGS) $(LTLINKOPTS) $(LTLIBPATHS) /DLL /OUT:$@ $(LSMOBJ)

lsmtest.exe: $(LSMOBJ) $(LSMTESTSRC) $(LSMTESTHDR) $(LIBOBJ)
	$(LTLINK) $(LSMOPTS) $(LSMTESTSRC) /link $(LSMOBJ) $(LIBOBJ)

Changes to ext/lsm1/lsm_ckpt.c.

218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
...
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
**
** The value of the nCkpt parameter includes the two checksum values at
** the end of the checkpoint. They are not used as inputs to the checksum 
** calculation. The checksum is based on the array of (nCkpt-2) integers
** at aCkpt[].
*/
static void ckptChecksum(u32 *aCkpt, u32 nCkpt, u32 *piCksum1, u32 *piCksum2){
  int i;
  u32 cksum1 = 1;
  u32 cksum2 = 2;

  if( nCkpt % 2 ){
    cksum1 += aCkpt[nCkpt-3] & 0x0000FFFF;
    cksum2 += aCkpt[nCkpt-3] & 0xFFFF0000;
  }
................................................................................
  Segment *pSegment               /* Populate this structure */
){
  assert( pSegment->iFirst==0 && pSegment->iLastPg==0 );
  assert( pSegment->nSize==0 && pSegment->iRoot==0 );
  pSegment->iFirst = ckptGobble64(aIn, piIn);
  pSegment->iLastPg = ckptGobble64(aIn, piIn);
  pSegment->iRoot = ckptGobble64(aIn, piIn);
  pSegment->nSize = ckptGobble64(aIn, piIn);
  assert( pSegment->iFirst );
}

static int ckptSetupMerge(lsm_db *pDb, u32 *aInt, int *piIn, Level *pLevel){
  Merge *pMerge;                  /* Allocated Merge object */
  int nInput;                     /* Number of input segments in merge */
  int iIn = *piIn;                /* Next value to read from aInt[] */







|







 







|







218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
...
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
**
** The value of the nCkpt parameter includes the two checksum values at
** the end of the checkpoint. They are not used as inputs to the checksum 
** calculation. The checksum is based on the array of (nCkpt-2) integers
** at aCkpt[].
*/
static void ckptChecksum(u32 *aCkpt, u32 nCkpt, u32 *piCksum1, u32 *piCksum2){
  u32 i;
  u32 cksum1 = 1;
  u32 cksum2 = 2;

  if( nCkpt % 2 ){
    cksum1 += aCkpt[nCkpt-3] & 0x0000FFFF;
    cksum2 += aCkpt[nCkpt-3] & 0xFFFF0000;
  }
................................................................................
  Segment *pSegment               /* Populate this structure */
){
  assert( pSegment->iFirst==0 && pSegment->iLastPg==0 );
  assert( pSegment->nSize==0 && pSegment->iRoot==0 );
  pSegment->iFirst = ckptGobble64(aIn, piIn);
  pSegment->iLastPg = ckptGobble64(aIn, piIn);
  pSegment->iRoot = ckptGobble64(aIn, piIn);
  pSegment->nSize = (int)ckptGobble64(aIn, piIn);
  assert( pSegment->iFirst );
}

static int ckptSetupMerge(lsm_db *pDb, u32 *aInt, int *piIn, Level *pLevel){
  Merge *pMerge;                  /* Allocated Merge object */
  int nInput;                     /* Number of input segments in merge */
  int iIn = *piIn;                /* Next value to read from aInt[] */

Changes to ext/lsm1/lsm_file.c.

536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
...
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
....
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
....
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
....
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
....
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
....
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
....
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
....
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
  return ((i64)iReal*pFS->nPagesize <= pFS->nMapLimit);
}

/*
** Given that there are currently nHash slots in the hash table, return 
** the hash key for file iFile, page iPg.
*/
static int fsHashKey(int nHash, int iPg){
  return (iPg % nHash);
}

/*
** This is a helper function for lsmFsOpen(). It opens a single file on
** disk (either the database or log file).
*/
................................................................................

/*
** Return the block number of the block that page iPg is located on. 
** Blocks are numbered starting from 1.
*/
static int fsPageToBlock(FileSystem *pFS, Pgno iPg){
  if( pFS->pCompress ){
    return (iPg / pFS->nBlocksize) + 1;
  }else{
    return 1 + ((iPg-1) / (pFS->nBlocksize / pFS->nPagesize));
  }
}

/*
** Return true if page iPg is the last page on its block.
**
** This function is only called in non-compressed database mode.
................................................................................
  i64 iEob;                       /* End of block */
  int nRead;
  int rc;

  assert( pFS->pCompress );

  iEob = fsLastPageOnPagesBlock(pFS, iOff) + 1;
  nRead = LSM_MIN(iEob - iOff, nData);

  rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nRead);
  if( rc==LSM_OK && nRead!=nData ){
    int iBlk;

    rc = fsBlockNext(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk);
    if( rc==LSM_OK ){
................................................................................
static int fsFreeBlock(
  FileSystem *pFS,                /* File system object */
  Snapshot *pSnapshot,            /* Worker snapshot */
  Segment *pIgnore,               /* Ignore this run when searching */
  int iBlk                        /* Block number of block to free */
){
  int rc = LSM_OK;                /* Return code */
  int iFirst;                     /* First page on block iBlk */
  int iLast;                      /* Last page on block iBlk */
  Level *pLevel;                  /* Used to iterate through levels */

  int iIn;                        /* Used to iterate through append points */
  int iOut = 0;                   /* Used to output append points */
  Pgno *aApp = pSnapshot->aiAppend;

  iFirst = fsFirstPageOnBlock(pFS, iBlk);
................................................................................
  int iBlk;

  assert( pRun->nSize>0 );
  assert( 0==fsSegmentRedirects(pFS, pRun) );
  assert( nPgno>0 && 0==fsPageRedirects(pFS, pRun, aPgno[0]) );

  iBlk = fsPageToBlock(pFS, pRun->iFirst);
  pRun->nSize += (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));

  while( rc==LSM_OK ){
    int iNext = 0;
    Pgno iFirst = firstOnBlock(pFS, iBlk, aPgno, nPgno);
    if( iFirst ){
      pRun->iFirst = iFirst;
      break;
    }
    rc = fsBlockNext(pFS, pRun, iBlk, &iNext);
    if( rc==LSM_OK ) rc = fsFreeBlock(pFS, pSnapshot, pRun, iBlk);
    pRun->nSize -= (
        1 + fsLastPageOnBlock(pFS, iBlk) - fsFirstPageOnBlock(pFS, iBlk)
    );
    iBlk = iNext;
  }

  pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
  assert( pRun->nSize>0 );
}

/*
** This function is only used in compressed database mode.
**
** Argument iPg is the page number (byte offset) of a page within segment
................................................................................
  Snapshot *pSnapshot,
  Level *pLvl,
  int bDefer,
  Page **ppOut
){
  int rc = LSM_OK;
  Page *pPg = 0;
  int iApp = 0;
  int iNext = 0;
  Segment *p = &pLvl->lhs;
  int iPrev = p->iLastPg;

  *ppOut = 0;
  assert( p->pRedirect==0 );

  if( pFS->pCompress || bDefer ){
    /* In compressed database mode the page is not assigned a page number
    ** or location in the database file at this point. This will be done
................................................................................
  int nData,                      /* Size of buffer aData[] in bytes */
  int *pRc                        /* IN/OUT: Error code */
){
  Pgno iRet = 0;
  int rc = *pRc;
  assert( pFS->pCompress );
  if( rc==LSM_OK ){
    int nRem;
    int nWrite;
    Pgno iLastOnBlock;
    Pgno iApp = pSeg->iLastPg+1;

    /* If this is the first data written into the segment, find an append-point
    ** or allocate a new block.  */
    if( iApp==1 ){
      pSeg->iFirst = iApp = findAppendPoint(pFS, 0);
................................................................................
      }
    }
    iRet = iApp;

    /* Write as much data as is possible at iApp (usually all of it). */
    iLastOnBlock = fsLastPageOnPagesBlock(pFS, iApp);
    if( rc==LSM_OK ){
      int nSpace = iLastOnBlock - iApp + 1;
      nWrite = LSM_MIN(nData, nSpace);
      nRem = nData - nWrite;
      assert( nWrite>=0 );
      if( nWrite!=0 ){
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aData, nWrite);
      }
      iApp += nWrite;
................................................................................
    Pgno iLast2;
    Pgno iLast = pSeg->iLastPg;     /* Current last page of segment */
    int nPad;                       /* Bytes of padding required */
    u8 aSz[3];

    iLast2 = (1 + iLast/pFS->szSector) * pFS->szSector - 1;
    assert( fsPageToBlock(pFS, iLast)==fsPageToBlock(pFS, iLast2) );
    nPad = iLast2 - iLast;

    if( iLast2>fsLastPageOnPagesBlock(pFS, iLast) ){
      nPad -= 4;
    }
    assert( nPad>=0 );

    if( nPad>=6 ){







|







 







|

|







 







|







 







|
|







 







|










|





|







 







|
|

|







 







|
|







 







|







 







|







536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
...
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
....
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
....
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
....
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
....
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
....
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
....
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
....
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
  return ((i64)iReal*pFS->nPagesize <= pFS->nMapLimit);
}

/*
** Given that there are currently nHash slots in the hash table, return 
** the hash key for file iFile, page iPg.
*/
static int fsHashKey(int nHash, Pgno iPg){
  return (iPg % nHash);
}

/*
** This is a helper function for lsmFsOpen(). It opens a single file on
** disk (either the database or log file).
*/
................................................................................

/*
** Return the block number of the block that page iPg is located on. 
** Blocks are numbered starting from 1.
*/
static int fsPageToBlock(FileSystem *pFS, Pgno iPg){
  if( pFS->pCompress ){
    return (int)((iPg / pFS->nBlocksize) + 1);
  }else{
    return (int)(1 + ((iPg-1) / (pFS->nBlocksize / pFS->nPagesize)));
  }
}

/*
** Return true if page iPg is the last page on its block.
**
** This function is only called in non-compressed database mode.
................................................................................
  i64 iEob;                       /* End of block */
  int nRead;
  int rc;

  assert( pFS->pCompress );

  iEob = fsLastPageOnPagesBlock(pFS, iOff) + 1;
  nRead = (int)LSM_MIN(iEob - iOff, nData);

  rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nRead);
  if( rc==LSM_OK && nRead!=nData ){
    int iBlk;

    rc = fsBlockNext(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk);
    if( rc==LSM_OK ){
................................................................................
static int fsFreeBlock(
  FileSystem *pFS,                /* File system object */
  Snapshot *pSnapshot,            /* Worker snapshot */
  Segment *pIgnore,               /* Ignore this run when searching */
  int iBlk                        /* Block number of block to free */
){
  int rc = LSM_OK;                /* Return code */
  Pgno iFirst;                    /* First page on block iBlk */
  Pgno iLast;                     /* Last page on block iBlk */
  Level *pLevel;                  /* Used to iterate through levels */

  int iIn;                        /* Used to iterate through append points */
  int iOut = 0;                   /* Used to output append points */
  Pgno *aApp = pSnapshot->aiAppend;

  iFirst = fsFirstPageOnBlock(pFS, iBlk);
................................................................................
  int iBlk;

  assert( pRun->nSize>0 );
  assert( 0==fsSegmentRedirects(pFS, pRun) );
  assert( nPgno>0 && 0==fsPageRedirects(pFS, pRun, aPgno[0]) );

  iBlk = fsPageToBlock(pFS, pRun->iFirst);
  pRun->nSize += (int)(pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));

  while( rc==LSM_OK ){
    int iNext = 0;
    Pgno iFirst = firstOnBlock(pFS, iBlk, aPgno, nPgno);
    if( iFirst ){
      pRun->iFirst = iFirst;
      break;
    }
    rc = fsBlockNext(pFS, pRun, iBlk, &iNext);
    if( rc==LSM_OK ) rc = fsFreeBlock(pFS, pSnapshot, pRun, iBlk);
    pRun->nSize -= (int)(
        1 + fsLastPageOnBlock(pFS, iBlk) - fsFirstPageOnBlock(pFS, iBlk)
    );
    iBlk = iNext;
  }

  pRun->nSize -= (int)(pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
  assert( pRun->nSize>0 );
}

/*
** This function is only used in compressed database mode.
**
** Argument iPg is the page number (byte offset) of a page within segment
................................................................................
  Snapshot *pSnapshot,
  Level *pLvl,
  int bDefer,
  Page **ppOut
){
  int rc = LSM_OK;
  Page *pPg = 0;
  Pgno iApp = 0;
  Pgno iNext = 0;
  Segment *p = &pLvl->lhs;
  Pgno iPrev = p->iLastPg;

  *ppOut = 0;
  assert( p->pRedirect==0 );

  if( pFS->pCompress || bDefer ){
    /* In compressed database mode the page is not assigned a page number
    ** or location in the database file at this point. This will be done
................................................................................
  int nData,                      /* Size of buffer aData[] in bytes */
  int *pRc                        /* IN/OUT: Error code */
){
  Pgno iRet = 0;
  int rc = *pRc;
  assert( pFS->pCompress );
  if( rc==LSM_OK ){
    int nRem = 0;
    int nWrite = 0;
    Pgno iLastOnBlock;
    Pgno iApp = pSeg->iLastPg+1;

    /* If this is the first data written into the segment, find an append-point
    ** or allocate a new block.  */
    if( iApp==1 ){
      pSeg->iFirst = iApp = findAppendPoint(pFS, 0);
................................................................................
      }
    }
    iRet = iApp;

    /* Write as much data as is possible at iApp (usually all of it). */
    iLastOnBlock = fsLastPageOnPagesBlock(pFS, iApp);
    if( rc==LSM_OK ){
      int nSpace = (int)(iLastOnBlock - iApp + 1);
      nWrite = LSM_MIN(nData, nSpace);
      nRem = nData - nWrite;
      assert( nWrite>=0 );
      if( nWrite!=0 ){
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aData, nWrite);
      }
      iApp += nWrite;
................................................................................
    Pgno iLast2;
    Pgno iLast = pSeg->iLastPg;     /* Current last page of segment */
    int nPad;                       /* Bytes of padding required */
    u8 aSz[3];

    iLast2 = (1 + iLast/pFS->szSector) * pFS->szSector - 1;
    assert( fsPageToBlock(pFS, iLast)==fsPageToBlock(pFS, iLast2) );
    nPad = (int)(iLast2 - iLast);

    if( iLast2>fsLastPageOnPagesBlock(pFS, iLast) ){
      nPad -= 4;
    }
    assert( nPad>=0 );

    if( nPad>=6 ){

Changes to ext/lsm1/lsm_log.c.

541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
...
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
...
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
    if( nPad ){
      u8 aPad[7] = {0,0,0,0,0,0,0};
      nPad = 8-nPad;
      if( nPad==1 ){
        aPad[0] = LSM_LOG_PAD1;
      }else{
        aPad[0] = LSM_LOG_PAD2;
        aPad[1] = (nPad-2);
      }
      rc = lsmStringBinAppend(&pLog->buf, aPad, nPad);
      if( rc!=LSM_OK ) return rc;
    }

    /* Append the JUMP record to the buffer. Then flush the buffer to disk
    ** and update the checksums. The next write to the log file (assuming
................................................................................
    while( nPad ){
      if( nPad==1 ){
        pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD1;
        nPad = 0;
      }else{
        int n = LSM_MIN(200, nPad-2);
        pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD2;
        pLog->buf.z[pLog->buf.n++] = n;
        nPad -= 2;
        memset(&pLog->buf.z[pLog->buf.n], 0x2B, n);
        pLog->buf.n += n;
        nPad -= n;
      }
    }
  }

  /* Make sure there is room in the log-buffer to add the CKSUM or COMMIT
  ** record. Then add the first byte of it.  */
  rc = lsmStringExtend(&pLog->buf, 9);
  if( rc!=LSM_OK ) return rc;
  pLog->buf.z[pLog->buf.n++] = eType;
  memset(&pLog->buf.z[pLog->buf.n], 0, 8);

  rc = logCksumAndFlush(pDb);

  /* If this is a commit and synchronous=full, sync the log to disk. */
  if( rc==LSM_OK && eType==LSM_LOG_COMMIT && pDb->eSafety==LSM_SAFETY_FULL ){
    rc = lsmFsSyncLog(pDb->pFS);
................................................................................
  LogWriter *pLog;

  if( pDb->bUseLog==0 ) return;
  pLog = pDb->pLogWriter;

  assert( pMark->iOff<=pLog->iOff+pLog->buf.n );
  if( (pMark->iOff & 0xFFFFFFF8)>=pLog->iOff ){
    pLog->buf.n = pMark->iOff - pLog->iOff;
    pLog->iCksumBuf = (pLog->buf.n & 0xFFFFFFF8);
  }else{
    pLog->buf.n = pMark->nBuf;
    memcpy(pLog->buf.z, pMark->aBuf, pMark->nBuf);
    pLog->iCksumBuf = 0;
    pLog->iOff = pMark->iOff - pMark->nBuf;
  }







|







 







|












|







 







|







541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
...
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
...
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
    if( nPad ){
      u8 aPad[7] = {0,0,0,0,0,0,0};
      nPad = 8-nPad;
      if( nPad==1 ){
        aPad[0] = LSM_LOG_PAD1;
      }else{
        aPad[0] = LSM_LOG_PAD2;
        aPad[1] = (u8)(nPad-2);
      }
      rc = lsmStringBinAppend(&pLog->buf, aPad, nPad);
      if( rc!=LSM_OK ) return rc;
    }

    /* Append the JUMP record to the buffer. Then flush the buffer to disk
    ** and update the checksums. The next write to the log file (assuming
................................................................................
    while( nPad ){
      if( nPad==1 ){
        pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD1;
        nPad = 0;
      }else{
        int n = LSM_MIN(200, nPad-2);
        pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD2;
        pLog->buf.z[pLog->buf.n++] = (char)n;
        nPad -= 2;
        memset(&pLog->buf.z[pLog->buf.n], 0x2B, n);
        pLog->buf.n += n;
        nPad -= n;
      }
    }
  }

  /* Make sure there is room in the log-buffer to add the CKSUM or COMMIT
  ** record. Then add the first byte of it.  */
  rc = lsmStringExtend(&pLog->buf, 9);
  if( rc!=LSM_OK ) return rc;
  pLog->buf.z[pLog->buf.n++] = (char)eType;
  memset(&pLog->buf.z[pLog->buf.n], 0, 8);

  rc = logCksumAndFlush(pDb);

  /* If this is a commit and synchronous=full, sync the log to disk. */
  if( rc==LSM_OK && eType==LSM_LOG_COMMIT && pDb->eSafety==LSM_SAFETY_FULL ){
    rc = lsmFsSyncLog(pDb->pFS);
................................................................................
  LogWriter *pLog;

  if( pDb->bUseLog==0 ) return;
  pLog = pDb->pLogWriter;

  assert( pMark->iOff<=pLog->iOff+pLog->buf.n );
  if( (pMark->iOff & 0xFFFFFFF8)>=pLog->iOff ){
    pLog->buf.n = (int)(pMark->iOff - pLog->iOff);
    pLog->iCksumBuf = (pLog->buf.n & 0xFFFFFFF8);
  }else{
    pLog->buf.n = pMark->nBuf;
    memcpy(pLog->buf.z, pMark->aBuf, pMark->nBuf);
    pLog->iCksumBuf = 0;
    pLog->iOff = pMark->iOff - pMark->nBuf;
  }

Changes to ext/lsm1/lsm_shared.c.

93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
...
635
636
637
638
639
640
641

642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
...
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
....
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
#else
# define assertNotInFreelist(x,y)
#endif

/*
** Append an entry to the free-list. If (iId==-1), this is a delete.
*/
int freelistAppend(lsm_db *db, int iBlk, i64 iId){
  lsm_env *pEnv = db->pEnv;
  Freelist *p;
  int i; 

  assert( iId==-1 || iId>=0 );
  p = db->bUseFreelist ? db->pFreelist : &db->pWorker->freelist;

................................................................................
*/
static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
  const int iDir = (p->bReverse ? -1 : 1);
  Freelist *pFree = p->pFreelist;

  assert( p->bDone==0 );

  if( pFree ){
    while( (p->iFree < pFree->nEntry) && p->iFree>=0 ){
      FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
      if( (p->bReverse==0 && pEntry->iBlk>iBlk)
       || (p->bReverse!=0 && pEntry->iBlk<iBlk)
      ){
        break;
      }else{
        p->iFree += iDir;
        if( pEntry->iId>=0 
            && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
          ){
          p->bDone = 1;
          return 1;
        }
        if( pEntry->iBlk==iBlk ) return 0;
      }
    }
  }

  if( p->xUsr(p->pUsrctx, iBlk, iSnapshot) ){
    p->bDone = 1;
    return 1;
................................................................................
    /* Check if this checkpoint has already been written to the database
    ** file. If so, set variable bDone to true.  */
    if( pShm->iMetaPage ){
      MetaPage *pPg;              /* Meta page */
      u8 *aData;                  /* Meta-page data buffer */
      int nData;                  /* Size of aData[] in bytes */
      i64 iCkpt;                  /* Id of checkpoint just loaded */
      i64 iDisk;                  /* Id of checkpoint already stored in db */
      iCkpt = lsmCheckpointId(pDb->aSnapshot, 0);
      rc = lsmFsMetaPageGet(pDb->pFS, 0, pShm->iMetaPage, &pPg);
      if( rc==LSM_OK ){
        aData = lsmFsMetaPageData(pPg, &nData);
        iDisk = lsmCheckpointId((u32 *)aData, 1);
        nWrite = lsmCheckpointNWrite((u32 *)aData, 1);
        lsmFsMetaPageRelease(pPg);
................................................................................
** follows:
**
**   (eOp==LSM_LOCK_UNLOCK) -> true if db has no lock on iLock
**   (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
**   (eOp==LSM_LOCK_EXCL)   -> true if db has an EXCLUSIVE lock on iLock.
*/
int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
  int ret;
  int eHave;

  assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
  assert( iLock<=16 );
  assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );

  eHave = shmLockType(db, iLock);







|







 







>



|
|










|







 







|







 







|







93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
...
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
...
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
....
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
#else
# define assertNotInFreelist(x,y)
#endif

/*
** Append an entry to the free-list. If (iId==-1), this is a delete.
*/
int freelistAppend(lsm_db *db, u32 iBlk, i64 iId){
  lsm_env *pEnv = db->pEnv;
  Freelist *p;
  int i; 

  assert( iId==-1 || iId>=0 );
  p = db->bUseFreelist ? db->pFreelist : &db->pWorker->freelist;

................................................................................
*/
static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
  const int iDir = (p->bReverse ? -1 : 1);
  Freelist *pFree = p->pFreelist;

  assert( p->bDone==0 );
  assert( iBlk>=0 );
  if( pFree ){
    while( (p->iFree < pFree->nEntry) && p->iFree>=0 ){
      FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
      if( (p->bReverse==0 && pEntry->iBlk>(u32)iBlk)
       || (p->bReverse!=0 && pEntry->iBlk<(u32)iBlk)
      ){
        break;
      }else{
        p->iFree += iDir;
        if( pEntry->iId>=0 
            && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
          ){
          p->bDone = 1;
          return 1;
        }
        if( pEntry->iBlk==(u32)iBlk ) return 0;
      }
    }
  }

  if( p->xUsr(p->pUsrctx, iBlk, iSnapshot) ){
    p->bDone = 1;
    return 1;
................................................................................
    /* Check if this checkpoint has already been written to the database
    ** file. If so, set variable bDone to true.  */
    if( pShm->iMetaPage ){
      MetaPage *pPg;              /* Meta page */
      u8 *aData;                  /* Meta-page data buffer */
      int nData;                  /* Size of aData[] in bytes */
      i64 iCkpt;                  /* Id of checkpoint just loaded */
      i64 iDisk = 0;              /* Id of checkpoint already stored in db */
      iCkpt = lsmCheckpointId(pDb->aSnapshot, 0);
      rc = lsmFsMetaPageGet(pDb->pFS, 0, pShm->iMetaPage, &pPg);
      if( rc==LSM_OK ){
        aData = lsmFsMetaPageData(pPg, &nData);
        iDisk = lsmCheckpointId((u32 *)aData, 1);
        nWrite = lsmCheckpointNWrite((u32 *)aData, 1);
        lsmFsMetaPageRelease(pPg);
................................................................................
** follows:
**
**   (eOp==LSM_LOCK_UNLOCK) -> true if db has no lock on iLock
**   (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
**   (eOp==LSM_LOCK_EXCL)   -> true if db has an EXCLUSIVE lock on iLock.
*/
int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
  int ret = 0;
  int eHave;

  assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
  assert( iLock<=16 );
  assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );

  eHave = shmLockType(db, iLock);

Changes to ext/lsm1/lsm_sorted.c.

658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
...
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
...
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
...
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
...
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
...
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
...
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
....
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
....
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
....
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
....
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
....
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
....
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
....
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
....
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
....
3471
3472
3473
3474
3475
3476
3477
3478
3479
3480
3481
3482
3483
3484
3485
....
3497
3498
3499
3500
3501
3502
3503
3504
3505
3506
3507
3508
3509
3510
3511
....
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631
3632
....
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
3681
....
3914
3915
3916
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927
3928
3929
3930
3931
3932
3933
3934
3935
3936
3937
....
3949
3950
3951
3952
3953
3954
3955
3956
3957
3958
3959
3960
3961
3962
3963
3964
3965
3966
3967
....
3969
3970
3971
3972
3973
3974
3975
3976
3977
3978
3979
3980
3981
3982
3983
....
3995
3996
3997
3998
3999
4000
4001
4002
4003
4004
4005
4006
4007
4008
4009
....
4029
4030
4031
4032
4033
4034
4035
4036
4037
4038
4039
4040
4041
4042
4043
4044
4045
4046
4047
4048
....
4256
4257
4258
4259
4260
4261
4262
4263
4264
4265
4266
4267
4268
4269
4270
....
4577
4578
4579
4580
4581
4582
4583
4584
4585
4586
4587
4588
4589
4590
4591
....
5180
5181
5182
5183
5184
5185
5186
5187
5188
5189
5190
5191
5192
5193
5194
....
5518
5519
5520
5521
5522
5523
5524
5525
5526
5527
5528
5529
5530
5531
5532
5533
5534
5535
5536
5537
5538
5539
5540
5541
5542
5543
....
5589
5590
5591
5592
5593
5594
5595
5596
5597
5598
5599
5600
5601
5602
5603
....
5701
5702
5703
5704
5705
5706
5707
5708
5709
5710
5711
5712
5713
5714
5715
}

static int btreeCursorPtr(u8 *aData, int nData, int iCell){
  int nCell;

  nCell = pageGetNRec(aData, nData);
  if( iCell>=nCell ){
    return pageGetPtr(aData, nData);
  }
  return pageGetRecordPtr(aData, nData, iCell);
}

static int btreeCursorNext(BtreeCursor *pCsr){
  int rc = LSM_OK;

  BtreePg *pPg = &pCsr->aPg[pCsr->iPg];
  int nCell; 
................................................................................
}

static int btreeCursorFirst(BtreeCursor *pCsr){
  int rc;

  Page *pPg = 0;
  FileSystem *pFS = pCsr->pFS;
  int iPg = pCsr->pSeg->iRoot;

  do {
    rc = lsmFsDbPageGet(pFS, pCsr->pSeg, iPg, &pPg);
    assert( (rc==LSM_OK)==(pPg!=0) );
    if( rc==LSM_OK ){
      u8 *aData;
      int nData;
................................................................................
        }
      }

      if( rc==LSM_OK ){
        assert( pCsr->aPg[pCsr->nDepth].iCell==0 );
        pCsr->aPg[pCsr->nDepth].pPage = pPg;
        pCsr->nDepth++;
        iPg = pageGetRecordPtr(aData, nData, 0);
      }
    }
  }while( rc==LSM_OK );
  lsmFsPageRelease(pPg);
  pCsr->iPg = pCsr->nDepth-1;

  if( rc==LSM_OK && pCsr->nDepth ){
................................................................................
    /* Populate any other aPg[] array entries */
    if( rc==LSM_OK && nDepth>1 ){
      Blob blob = {0,0,0};
      void *pSeek;
      int nSeek;
      int iTopicSeek;
      int iPg = 0;
      int iLoad = pSeg->iRoot;
      Page *pPg = pCsr->aPg[nDepth-1].pPage;
 
      if( pageObjGetNRec(pPg)==0 ){
        /* This can happen when pPg is the right-most leaf in the b-tree.
        ** In this case, set the iTopicSeek/pSeek/nSeek key to a value
        ** greater than any real key.  */
        assert( iCell==-1 );
................................................................................
          int iMin;
          int iMax;
          int iCell;

          aData = fsPageData(pPg, &nData);
          assert( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );

          iLoad = pageGetPtr(aData, nData);
          iCell = pageGetNRec(aData, nData); 
          iMax = iCell-1;
          iMin = 0;

          while( iMax>=iMin ){
            int iTry = (iMin+iMax)/2;
            void *pKey; int nKey;         /* Key for cell iTry */
................................................................................

            res = sortedKeyCompare(
                xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey
            );
            assert( res!=0 );

            if( res<0 ){
              iLoad = iPtr;
              iCell = iTry;
              iMax = iTry-1;
            }else{
              iMin = iTry+1;
            }
          }

................................................................................

static void segmentPtrSetPage(SegmentPtr *pPtr, Page *pNext){
  lsmFsPageRelease(pPtr->pPg);
  if( pNext ){
    int nData;
    u8 *aData = fsPageData(pNext, &nData);
    pPtr->nCell = pageGetNRec(aData, nData);
    pPtr->flags = pageGetFlags(aData, nData);
    pPtr->iPtr = pageGetPtr(aData, nData);
  }
  pPtr->pPg = pNext;
}

/*
** Load a new page into the SegmentPtr object pPtr.
................................................................................
  int iTopic,                     /* Key topic to seek to */
  void *pKey, int nKey,           /* Key to seek to */
  int eSeek,                      /* Search bias - see above */
  int *piPtr,                     /* OUT: FC pointer */
  int *pbStop
){
  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
  int res;                        /* Result of comparison operation */
  int rc = LSM_OK;
  int iMin;
  int iMax;
  Pgno iPtrOut = 0;

  /* If the current page contains an oversized entry, then there are no
  ** pointers to one or more of the subsequent pages in the sorted run.
................................................................................
    ){
      assert( eSeek!=LSM_SEEK_EQ );
      rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE);
    }
  }

  assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
  *piPtr = iPtrOut;
  return rc;
}

static int seekInBtree(
  MultiCursor *pCsr,              /* Multi-cursor object */
  Segment *pSeg,                  /* Seek within this segment */
  int iTopic,
................................................................................
){
  int i = 0;
  int rc;
  int iPg;
  Page *pPg = 0;
  Blob blob = {0, 0, 0};

  iPg = pSeg->iRoot;
  do {
    Pgno *piFirst = 0;
    if( aPg ){
      aPg[i++] = iPg;
      piFirst = &aPg[i];
    }

................................................................................
      int nRec;
      int flags;

      aData = fsPageData(pPg, &nData);
      flags = pageGetFlags(aData, nData);
      if( (flags & SEGMENT_BTREE_FLAG)==0 ) break;

      iPg = pageGetPtr(aData, nData);
      nRec = pageGetNRec(aData, nData);

      iMin = 0;
      iMax = nRec-1;
      while( iMax>=iMin ){
        int iTry = (iMin+iMax)/2;
        void *pKeyT; int nKeyT;       /* Key for cell iTry */
................................................................................
          i++;
        }

        res = sortedKeyCompare(
            pCsr->pDb->xCmp, iTopic, pKey, nKey, iTopicT, pKeyT, nKeyT
        );
        if( res<0 ){
          iPg = iPtr;
          iMax = iTry-1;
        }else{
          iMin = iTry+1;
        }
      }
      lsmFsPageRelease(pPg);
      pPg = 0;
................................................................................
  if( pPtr->pSeg->iRoot ){
    Page *pPg;
    assert( pPtr->pSeg->iRoot!=0 );
    rc = seekInBtree(pCsr, pPtr->pSeg, iTopic, pKey, nKey, 0, &pPg);
    if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg);
  }else{
    if( iPtr==0 ){
      iPtr = pPtr->pSeg->iFirst;
    }
    if( rc==LSM_OK ){
      rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
    }
  }

  if( rc==LSM_OK ){
................................................................................
  }

  /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
  ** is not a composite level and there is no split-key). Search the 
  ** left-hand-side of the level in this case.  */
  if( res<0 ){
    int iPtr = 0;
    if( nRhs==0 ) iPtr = *piPgno;

    rc = seekInSegment(
        pCsr, &aPtr[0], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
    );
    if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
      res = 0;
    }
  }
  
  if( res>=0 ){
    int bHit = 0;                 /* True if at least one rhs is not EOF */
    int iPtr = *piPgno;
    int i;
    for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
      SegmentPtr *pPtr = &aPtr[i];
      iOut = 0;
      rc = seekInSegment(
          pCsr, pPtr, iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
      );
................................................................................
      rc = lsmMCursorLast(pCsr);
    }else{
      rc = lsmMCursorSeek(pCsr, 1, "", 0, LSM_SEEK_GE);
    }

    while( rc==LSM_OK && lsmMCursorValid(pCsr) && rtIsSystem(pCsr->eType) ){
      void *pKey; int nKey;
      void *pVal; int nVal;

      rc = lsmMCursorKey(pCsr, &pKey, &nKey);
      if( rc==LSM_OK ) rc = lsmMCursorValue(pCsr, &pVal, &nVal);
      if( rc==LSM_OK && (nKey!=4 || nVal!=8) ) rc = LSM_CORRUPT_BKPT;

      if( rc==LSM_OK ){
        int iBlk;
................................................................................
  p = &pMW->hier;

  if( p->apHier==0 && pSeg->iRoot!=0 ){
    FileSystem *pFS = pMW->pDb->pFS;
    lsm_env *pEnv = pMW->pDb->pEnv;
    Page **apHier = 0;
    int nHier = 0;
    int iPg = pSeg->iRoot;

    do {
      Page *pPg = 0;
      u8 *aData;
      int nData;
      int flags;

................................................................................
          break;
        }
        apHier = apNew;
        memmove(&apHier[1], &apHier[0], sizeof(Page *) * nHier);
        nHier++;

        apHier[0] = pPg;
        iPg = pageGetPtr(aData, nData);
      }else{
        lsmFsPageRelease(pPg);
        break;
      }
    }while( 1 );

    if( rc==LSM_OK ){
................................................................................

      /* If the key will fit on this page, break out of the loop here.
      ** The new entry will be written to page apHier[iLevel]. */
      pOld = p->apHier[iLevel];
      assert( lsmFsPageWritable(pOld) );
      aData = fsPageData(pOld, &nData);
      if( eType==0 ){
        nByte = 2 + 1 + lsmVarintLen32(iPtr) + lsmVarintLen32(iKeyPg);
      }else{
        nByte = 2 + 1 + lsmVarintLen32(iPtr) + lsmVarintLen32(nKey) + nKey;
      }
      nRec = pageGetNRec(aData, nData);
      nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
      if( nByte<=nFree ) break;

      /* Otherwise, this page is full. Set the right-hand-child pointer
      ** to iPtr and release it.  */
................................................................................
    }
  }

  /* Write the key into page apHier[iLevel]. */
  aData = fsPageData(p->apHier[iLevel], &nData);
  iOff = mergeWorkerPageOffset(aData, nData);
  nRec = pageGetNRec(aData, nData);
  lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], iOff);
  lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], nRec+1);
  if( eType==0 ){
    aData[iOff++] = 0x00;
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], iKeyPg);
  }else{
    aData[iOff++] = eType;
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], nKey);
    memcpy(&aData[iOff], pKey, nKey);
  }

  return rc;
}

................................................................................
  int iFPtr = 0;                  /* Pointer value read from footer of pPg */
  MultiCursor *pCsr = pMW->pCsr;

  assert( pMW->pPage==0 );

  if( pCsr->pBtCsr ){
    rc = LSM_OK;
    iFPtr = pMW->pLevel->pNext->lhs.iFirst;
  }else if( pCsr->nPtr>0 ){
    Segment *pSeg;
    pSeg = pCsr->aPtr[pCsr->nPtr-1].pSeg;
    rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg, pSeg->iFirst, &pPg);
    if( rc==LSM_OK ){
      u8 *aData;                    /* Buffer for page pPg */
      int nData;                    /* Size of aData[] in bytes */
      aData = fsPageData(pPg, &nData);
      iFPtr = pageGetPtr(aData, nData);
      lsmFsPageRelease(pPg);
    }
  }

  if( rc==LSM_OK ){
    rc = mergeWorkerNextPage(pMW, iFPtr);
    if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr;
................................................................................
  int iPtr                        /* Absolute value of page pointer, or 0 */
){
  int rc = LSM_OK;                /* Return code */
  Merge *pMerge;                  /* Persistent part of level merge state */
  int nHdr;                       /* Space required for this record header */
  Page *pPg;                      /* Page to write to */
  u8 *aData;                      /* Data buffer for page pWriter->pPage */
  int nData;                      /* Size of buffer aData[] in bytes */
  int nRec;                       /* Number of records on page pPg */
  int iFPtr;                      /* Value of pointer in footer of pPg */
  int iRPtr = 0;                  /* Value of pointer written into record */
  int iOff;                       /* Current write offset within page pPg */
  Segment *pSeg;                  /* Segment being written */
  int flags = 0;                  /* If != 0, flags value for page footer */
  int bFirst = 0;                 /* True for first key of output run */

  pMerge = pMW->pLevel->pMerge;    
  pSeg = &pMW->pLevel->lhs;

................................................................................
    rc = mergeWorkerFirstPage(pMW);
    bFirst = 1;
  }
  pPg = pMW->pPage;
  if( pPg ){
    aData = fsPageData(pPg, &nData);
    nRec = pageGetNRec(aData, nData);
    iFPtr = pageGetPtr(aData, nData);
    iRPtr = iPtr - iFPtr;
  }
     
  /* Figure out how much space is required by the new record. The space
  ** required is divided into two sections: the header and the body. The
  ** header consists of the intial varint fields. The body are the blobs 
  ** of data that correspond to the key and value data. The entire header 
................................................................................
    nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
    if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

    /* If the entire header will not fit on page pPg, or if page pPg is 
    ** marked read-only, advance to the next page of the output run. */
    iOff = pMerge->iOutputOff;
    if( iOff<0 || pPg==0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
      iFPtr = *pMW->pCsr->pPrevMergePtr;
      iRPtr = iPtr - iFPtr;
      iOff = 0;
      nRec = 0;
      rc = mergeWorkerNextPage(pMW, iFPtr);
      pPg = pMW->pPage;
    }
  }
................................................................................
  }

  /* Update the output segment */
  if( rc==LSM_OK ){
    aData = fsPageData(pPg, &nData);

    /* Update the page footer. */
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], nRec+1);
    lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], iOff);
    if( flags ) lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], flags);

    /* Write the entry header into the current page. */
    aData[iOff++] = eType;                                               /* 1 */
    iOff += lsmVarintPut32(&aData[iOff], iRPtr);                         /* 2 */
    iOff += lsmVarintPut32(&aData[iOff], nKey);                          /* 3 */
    if( rtIsWrite(eType) ) iOff += lsmVarintPut32(&aData[iOff], nVal);   /* 4 */
    pMerge->iOutputOff = iOff;

    /* Write the key and data into the segment. */
    assert( iFPtr==pageGetPtr(aData, nData) );
................................................................................
      rc = multiCursorGetVal(pCsr, iVal, &pVal, &nVal);
      if( pVal && rc==LSM_OK ){
        assert( nVal>=0 );
        rc = sortedBlobSet(pDb->pEnv, &pCsr->val, pVal, nVal);
        pVal = pCsr->val.pData;
      }
      if( rc==LSM_OK ){
        rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pVal, nVal, iPtr);
      }
    }
  }

  /* Advance the cursor to the next input record (assuming one exists). */
  assert( lsmMCursorValid(pMW->pCsr) );
  if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);
................................................................................
      int i;
      for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
        MergeInput *pInput = &pMerge->aInput[i];
        if( pInput->iPg ){
          SegmentPtr *pPtr;
          assert( pCsr->aPtr[i].pPg==0 );
          pPtr = &pCsr->aPtr[i];
          rc = segmentPtrLoadPage(pDb->pFS, pPtr, pInput->iPg);
          if( rc==LSM_OK && pPtr->nCell>0 ){
            rc = segmentPtrLoadCell(pPtr, pInput->iCell);
          }
        }
      }

      if( rc==LSM_OK && pCsr->pBtCsr ){
................................................................................
    u32 nUnsync;
    int nPgsz;

    lsmCheckpointSynced(pDb, 0, 0, &nSync);
    nUnsync = lsmCheckpointNWrite(pDb->pShmhdr->aSnap1, 0);
    nPgsz = lsmCheckpointPgsz(pDb->pShmhdr->aSnap1);

    nMax = LSM_MIN(nMax, (pDb->nAutockpt/nPgsz) - (int)(nUnsync-nSync));
    if( nMax<nRem ){
      bCkpt = 1;
      nRem = LSM_MAX(nMax, 0);
    }
  }

  /* If there exists in-memory data ready to be flushed to disk, attempt
................................................................................
  int flags;
  u8 *aData;
  int nData;

  aData = fsPageData(pPg, &nData);

  nRec = pageGetNRec(aData, nData);
  iPtr = pageGetPtr(aData, nData);
  flags = pageGetFlags(aData, nData);

  lsmStringInit(&s, pDb->pEnv);
  lsmStringAppendf(&s,"nCell=%d iPtr=%d flags=%d {", nRec, iPtr, flags);
  if( flags&SEGMENT_BTREE_FLAG ) iPtr = 0;

  for(i=0; i<nRec; i++){
    Page *pRef = 0;               /* Pointer to page iRef */
    int iChar;
    u8 *aKey; int nKey = 0;       /* Key */
    u8 *aVal; int nVal = 0;       /* Value */
    int iTopic;
    u8 *aCell;
    int iPgPtr;
    int eType;

    aCell = pageGetCell(aData, nData, i);
    eType = *aCell++;
................................................................................
  int *piPgPtr,
  u8 **paKey, int *pnKey,
  u8 **paVal, int *pnVal,
  Blob *pBlob
){
  u8 *aData; int nData;           /* Page data */
  u8 *aKey; int nKey = 0;         /* Key */
  u8 *aVal; int nVal = 0;         /* Value */
  int eType;
  int iPgPtr;
  Page *pRef = 0;                 /* Pointer to page iRef */
  u8 *aCell;

  aData = fsPageData(pPg, &nData);

................................................................................
    int iPtr;
    int flags;
    int iCell;
    u8 *aData; int nData;         /* Page data and size thereof */

    aData = fsPageData(pPg, &nData);
    nRec = pageGetNRec(aData, nData);
    iPtr = pageGetPtr(aData, nData);
    flags = pageGetFlags(aData, nData);

    lsmStringInit(&str, pDb->pEnv);
    lsmStringAppendf(&str, "Page : %lld  (%d bytes)\n", iPg, nData);
    lsmStringAppendf(&str, "nRec : %d\n", nRec);
    lsmStringAppendf(&str, "iPtr : %d\n", iPtr);
    lsmStringAppendf(&str, "flags: %04x\n", flags);







|

|







 







|







 







|







 







|







 







|







 







|







 







|







 







|







 







|







 







|







 







|







 







|







 







|







 







|











|







 







|







 







|







 







|







 







|

|







 







|
|


|
|


|







 







|








|







 







|
|
|

|







 







|







 







|







 







|
|
|


|







 







|







 







|







 







|







 







|










|







 







|







 







|







658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
...
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
...
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
...
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
...
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
...
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
...
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
....
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
....
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
....
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
....
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
....
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
....
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
....
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
....
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
....
3471
3472
3473
3474
3475
3476
3477
3478
3479
3480
3481
3482
3483
3484
3485
....
3497
3498
3499
3500
3501
3502
3503
3504
3505
3506
3507
3508
3509
3510
3511
....
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631
3632
....
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
3681
....
3914
3915
3916
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927
3928
3929
3930
3931
3932
3933
3934
3935
3936
3937
....
3949
3950
3951
3952
3953
3954
3955
3956
3957
3958
3959
3960
3961
3962
3963
3964
3965
3966
3967
....
3969
3970
3971
3972
3973
3974
3975
3976
3977
3978
3979
3980
3981
3982
3983
....
3995
3996
3997
3998
3999
4000
4001
4002
4003
4004
4005
4006
4007
4008
4009
....
4029
4030
4031
4032
4033
4034
4035
4036
4037
4038
4039
4040
4041
4042
4043
4044
4045
4046
4047
4048
....
4256
4257
4258
4259
4260
4261
4262
4263
4264
4265
4266
4267
4268
4269
4270
....
4577
4578
4579
4580
4581
4582
4583
4584
4585
4586
4587
4588
4589
4590
4591
....
5180
5181
5182
5183
5184
5185
5186
5187
5188
5189
5190
5191
5192
5193
5194
....
5518
5519
5520
5521
5522
5523
5524
5525
5526
5527
5528
5529
5530
5531
5532
5533
5534
5535
5536
5537
5538
5539
5540
5541
5542
5543
....
5589
5590
5591
5592
5593
5594
5595
5596
5597
5598
5599
5600
5601
5602
5603
....
5701
5702
5703
5704
5705
5706
5707
5708
5709
5710
5711
5712
5713
5714
5715
}

static int btreeCursorPtr(u8 *aData, int nData, int iCell){
  int nCell;

  nCell = pageGetNRec(aData, nData);
  if( iCell>=nCell ){
    return (int)pageGetPtr(aData, nData);
  }
  return (int)pageGetRecordPtr(aData, nData, iCell);
}

static int btreeCursorNext(BtreeCursor *pCsr){
  int rc = LSM_OK;

  BtreePg *pPg = &pCsr->aPg[pCsr->iPg];
  int nCell; 
................................................................................
}

static int btreeCursorFirst(BtreeCursor *pCsr){
  int rc;

  Page *pPg = 0;
  FileSystem *pFS = pCsr->pFS;
  int iPg = (int)pCsr->pSeg->iRoot;

  do {
    rc = lsmFsDbPageGet(pFS, pCsr->pSeg, iPg, &pPg);
    assert( (rc==LSM_OK)==(pPg!=0) );
    if( rc==LSM_OK ){
      u8 *aData;
      int nData;
................................................................................
        }
      }

      if( rc==LSM_OK ){
        assert( pCsr->aPg[pCsr->nDepth].iCell==0 );
        pCsr->aPg[pCsr->nDepth].pPage = pPg;
        pCsr->nDepth++;
        iPg = (int)pageGetRecordPtr(aData, nData, 0);
      }
    }
  }while( rc==LSM_OK );
  lsmFsPageRelease(pPg);
  pCsr->iPg = pCsr->nDepth-1;

  if( rc==LSM_OK && pCsr->nDepth ){
................................................................................
    /* Populate any other aPg[] array entries */
    if( rc==LSM_OK && nDepth>1 ){
      Blob blob = {0,0,0};
      void *pSeek;
      int nSeek;
      int iTopicSeek;
      int iPg = 0;
      int iLoad = (int)pSeg->iRoot;
      Page *pPg = pCsr->aPg[nDepth-1].pPage;
 
      if( pageObjGetNRec(pPg)==0 ){
        /* This can happen when pPg is the right-most leaf in the b-tree.
        ** In this case, set the iTopicSeek/pSeek/nSeek key to a value
        ** greater than any real key.  */
        assert( iCell==-1 );
................................................................................
          int iMin;
          int iMax;
          int iCell;

          aData = fsPageData(pPg, &nData);
          assert( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) );

          iLoad = (int)pageGetPtr(aData, nData);
          iCell = pageGetNRec(aData, nData); 
          iMax = iCell-1;
          iMin = 0;

          while( iMax>=iMin ){
            int iTry = (iMin+iMax)/2;
            void *pKey; int nKey;         /* Key for cell iTry */
................................................................................

            res = sortedKeyCompare(
                xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey
            );
            assert( res!=0 );

            if( res<0 ){
              iLoad = (int)iPtr;
              iCell = iTry;
              iMax = iTry-1;
            }else{
              iMin = iTry+1;
            }
          }

................................................................................

static void segmentPtrSetPage(SegmentPtr *pPtr, Page *pNext){
  lsmFsPageRelease(pPtr->pPg);
  if( pNext ){
    int nData;
    u8 *aData = fsPageData(pNext, &nData);
    pPtr->nCell = pageGetNRec(aData, nData);
    pPtr->flags = (u16)pageGetFlags(aData, nData);
    pPtr->iPtr = pageGetPtr(aData, nData);
  }
  pPtr->pPg = pNext;
}

/*
** Load a new page into the SegmentPtr object pPtr.
................................................................................
  int iTopic,                     /* Key topic to seek to */
  void *pKey, int nKey,           /* Key to seek to */
  int eSeek,                      /* Search bias - see above */
  int *piPtr,                     /* OUT: FC pointer */
  int *pbStop
){
  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
  int res = 0;                        /* Result of comparison operation */
  int rc = LSM_OK;
  int iMin;
  int iMax;
  Pgno iPtrOut = 0;

  /* If the current page contains an oversized entry, then there are no
  ** pointers to one or more of the subsequent pages in the sorted run.
................................................................................
    ){
      assert( eSeek!=LSM_SEEK_EQ );
      rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE);
    }
  }

  assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
  *piPtr = (int)iPtrOut;
  return rc;
}

static int seekInBtree(
  MultiCursor *pCsr,              /* Multi-cursor object */
  Segment *pSeg,                  /* Seek within this segment */
  int iTopic,
................................................................................
){
  int i = 0;
  int rc;
  int iPg;
  Page *pPg = 0;
  Blob blob = {0, 0, 0};

  iPg = (int)pSeg->iRoot;
  do {
    Pgno *piFirst = 0;
    if( aPg ){
      aPg[i++] = iPg;
      piFirst = &aPg[i];
    }

................................................................................
      int nRec;
      int flags;

      aData = fsPageData(pPg, &nData);
      flags = pageGetFlags(aData, nData);
      if( (flags & SEGMENT_BTREE_FLAG)==0 ) break;

      iPg = (int)pageGetPtr(aData, nData);
      nRec = pageGetNRec(aData, nData);

      iMin = 0;
      iMax = nRec-1;
      while( iMax>=iMin ){
        int iTry = (iMin+iMax)/2;
        void *pKeyT; int nKeyT;       /* Key for cell iTry */
................................................................................
          i++;
        }

        res = sortedKeyCompare(
            pCsr->pDb->xCmp, iTopic, pKey, nKey, iTopicT, pKeyT, nKeyT
        );
        if( res<0 ){
          iPg = (int)iPtr;
          iMax = iTry-1;
        }else{
          iMin = iTry+1;
        }
      }
      lsmFsPageRelease(pPg);
      pPg = 0;
................................................................................
  if( pPtr->pSeg->iRoot ){
    Page *pPg;
    assert( pPtr->pSeg->iRoot!=0 );
    rc = seekInBtree(pCsr, pPtr->pSeg, iTopic, pKey, nKey, 0, &pPg);
    if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg);
  }else{
    if( iPtr==0 ){
      iPtr = (int)pPtr->pSeg->iFirst;
    }
    if( rc==LSM_OK ){
      rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
    }
  }

  if( rc==LSM_OK ){
................................................................................
  }

  /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
  ** is not a composite level and there is no split-key). Search the 
  ** left-hand-side of the level in this case.  */
  if( res<0 ){
    int iPtr = 0;
    if( nRhs==0 ) iPtr = (int)*piPgno;

    rc = seekInSegment(
        pCsr, &aPtr[0], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
    );
    if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
      res = 0;
    }
  }
  
  if( res>=0 ){
    int bHit = 0;                 /* True if at least one rhs is not EOF */
    int iPtr = (int)*piPgno;
    int i;
    for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
      SegmentPtr *pPtr = &aPtr[i];
      iOut = 0;
      rc = seekInSegment(
          pCsr, pPtr, iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
      );
................................................................................
      rc = lsmMCursorLast(pCsr);
    }else{
      rc = lsmMCursorSeek(pCsr, 1, "", 0, LSM_SEEK_GE);
    }

    while( rc==LSM_OK && lsmMCursorValid(pCsr) && rtIsSystem(pCsr->eType) ){
      void *pKey; int nKey;
      void *pVal = 0; int nVal = 0;

      rc = lsmMCursorKey(pCsr, &pKey, &nKey);
      if( rc==LSM_OK ) rc = lsmMCursorValue(pCsr, &pVal, &nVal);
      if( rc==LSM_OK && (nKey!=4 || nVal!=8) ) rc = LSM_CORRUPT_BKPT;

      if( rc==LSM_OK ){
        int iBlk;
................................................................................
  p = &pMW->hier;

  if( p->apHier==0 && pSeg->iRoot!=0 ){
    FileSystem *pFS = pMW->pDb->pFS;
    lsm_env *pEnv = pMW->pDb->pEnv;
    Page **apHier = 0;
    int nHier = 0;
    int iPg = (int)pSeg->iRoot;

    do {
      Page *pPg = 0;
      u8 *aData;
      int nData;
      int flags;

................................................................................
          break;
        }
        apHier = apNew;
        memmove(&apHier[1], &apHier[0], sizeof(Page *) * nHier);
        nHier++;

        apHier[0] = pPg;
        iPg = (int)pageGetPtr(aData, nData);
      }else{
        lsmFsPageRelease(pPg);
        break;
      }
    }while( 1 );

    if( rc==LSM_OK ){
................................................................................

      /* If the key will fit on this page, break out of the loop here.
      ** The new entry will be written to page apHier[iLevel]. */
      pOld = p->apHier[iLevel];
      assert( lsmFsPageWritable(pOld) );
      aData = fsPageData(pOld, &nData);
      if( eType==0 ){
        nByte = 2 + 1 + lsmVarintLen32((int)iPtr) + lsmVarintLen32((int)iKeyPg);
      }else{
        nByte = 2 + 1 + lsmVarintLen32((int)iPtr) + lsmVarintLen32(nKey) + nKey;
      }
      nRec = pageGetNRec(aData, nData);
      nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
      if( nByte<=nFree ) break;

      /* Otherwise, this page is full. Set the right-hand-child pointer
      ** to iPtr and release it.  */
................................................................................
    }
  }

  /* Write the key into page apHier[iLevel]. */
  aData = fsPageData(p->apHier[iLevel], &nData);
  iOff = mergeWorkerPageOffset(aData, nData);
  nRec = pageGetNRec(aData, nData);
  lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], (u16)iOff);
  lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], (u16)(nRec+1));
  if( eType==0 ){
    aData[iOff++] = 0x00;
    iOff += lsmVarintPut32(&aData[iOff], (int)iPtr);
    iOff += lsmVarintPut32(&aData[iOff], (int)iKeyPg);
  }else{
    aData[iOff++] = eType;
    iOff += lsmVarintPut32(&aData[iOff], (int)iPtr);
    iOff += lsmVarintPut32(&aData[iOff], nKey);
    memcpy(&aData[iOff], pKey, nKey);
  }

  return rc;
}

................................................................................
  int iFPtr = 0;                  /* Pointer value read from footer of pPg */
  MultiCursor *pCsr = pMW->pCsr;

  assert( pMW->pPage==0 );

  if( pCsr->pBtCsr ){
    rc = LSM_OK;
    iFPtr = (int)pMW->pLevel->pNext->lhs.iFirst;
  }else if( pCsr->nPtr>0 ){
    Segment *pSeg;
    pSeg = pCsr->aPtr[pCsr->nPtr-1].pSeg;
    rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg, pSeg->iFirst, &pPg);
    if( rc==LSM_OK ){
      u8 *aData;                    /* Buffer for page pPg */
      int nData;                    /* Size of aData[] in bytes */
      aData = fsPageData(pPg, &nData);
      iFPtr = (int)pageGetPtr(aData, nData);
      lsmFsPageRelease(pPg);
    }
  }

  if( rc==LSM_OK ){
    rc = mergeWorkerNextPage(pMW, iFPtr);
    if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr;
................................................................................
  int iPtr                        /* Absolute value of page pointer, or 0 */
){
  int rc = LSM_OK;                /* Return code */
  Merge *pMerge;                  /* Persistent part of level merge state */
  int nHdr;                       /* Space required for this record header */
  Page *pPg;                      /* Page to write to */
  u8 *aData;                      /* Data buffer for page pWriter->pPage */
  int nData = 0;                  /* Size of buffer aData[] in bytes */
  int nRec = 0;                   /* Number of records on page pPg */
  int iFPtr = 0;                  /* Value of pointer in footer of pPg */
  int iRPtr = 0;                  /* Value of pointer written into record */
  int iOff = 0;                   /* Current write offset within page pPg */
  Segment *pSeg;                  /* Segment being written */
  int flags = 0;                  /* If != 0, flags value for page footer */
  int bFirst = 0;                 /* True for first key of output run */

  pMerge = pMW->pLevel->pMerge;    
  pSeg = &pMW->pLevel->lhs;

................................................................................
    rc = mergeWorkerFirstPage(pMW);
    bFirst = 1;
  }
  pPg = pMW->pPage;
  if( pPg ){
    aData = fsPageData(pPg, &nData);
    nRec = pageGetNRec(aData, nData);
    iFPtr = (int)pageGetPtr(aData, nData);
    iRPtr = iPtr - iFPtr;
  }
     
  /* Figure out how much space is required by the new record. The space
  ** required is divided into two sections: the header and the body. The
  ** header consists of the intial varint fields. The body are the blobs 
  ** of data that correspond to the key and value data. The entire header 
................................................................................
    nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
    if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

    /* If the entire header will not fit on page pPg, or if page pPg is 
    ** marked read-only, advance to the next page of the output run. */
    iOff = pMerge->iOutputOff;
    if( iOff<0 || pPg==0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
      iFPtr = (int)*pMW->pCsr->pPrevMergePtr;
      iRPtr = iPtr - iFPtr;
      iOff = 0;
      nRec = 0;
      rc = mergeWorkerNextPage(pMW, iFPtr);
      pPg = pMW->pPage;
    }
  }
................................................................................
  }

  /* Update the output segment */
  if( rc==LSM_OK ){
    aData = fsPageData(pPg, &nData);

    /* Update the page footer. */
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], (u16)(nRec+1));
    lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], (u16)iOff);
    if( flags ) lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], (u16)flags);

    /* Write the entry header into the current page. */
    aData[iOff++] = (u8)eType;                                               /* 1 */
    iOff += lsmVarintPut32(&aData[iOff], iRPtr);                         /* 2 */
    iOff += lsmVarintPut32(&aData[iOff], nKey);                          /* 3 */
    if( rtIsWrite(eType) ) iOff += lsmVarintPut32(&aData[iOff], nVal);   /* 4 */
    pMerge->iOutputOff = iOff;

    /* Write the key and data into the segment. */
    assert( iFPtr==pageGetPtr(aData, nData) );
................................................................................
      rc = multiCursorGetVal(pCsr, iVal, &pVal, &nVal);
      if( pVal && rc==LSM_OK ){
        assert( nVal>=0 );
        rc = sortedBlobSet(pDb->pEnv, &pCsr->val, pVal, nVal);
        pVal = pCsr->val.pData;
      }
      if( rc==LSM_OK ){
        rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pVal, nVal, (int)iPtr);
      }
    }
  }

  /* Advance the cursor to the next input record (assuming one exists). */
  assert( lsmMCursorValid(pMW->pCsr) );
  if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);
................................................................................
      int i;
      for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
        MergeInput *pInput = &pMerge->aInput[i];
        if( pInput->iPg ){
          SegmentPtr *pPtr;
          assert( pCsr->aPtr[i].pPg==0 );
          pPtr = &pCsr->aPtr[i];
          rc = segmentPtrLoadPage(pDb->pFS, pPtr, (int)pInput->iPg);
          if( rc==LSM_OK && pPtr->nCell>0 ){
            rc = segmentPtrLoadCell(pPtr, pInput->iCell);
          }
        }
      }

      if( rc==LSM_OK && pCsr->pBtCsr ){
................................................................................
    u32 nUnsync;
    int nPgsz;

    lsmCheckpointSynced(pDb, 0, 0, &nSync);
    nUnsync = lsmCheckpointNWrite(pDb->pShmhdr->aSnap1, 0);
    nPgsz = lsmCheckpointPgsz(pDb->pShmhdr->aSnap1);

    nMax = (int)LSM_MIN(nMax, (pDb->nAutockpt/nPgsz) - (int)(nUnsync-nSync));
    if( nMax<nRem ){
      bCkpt = 1;
      nRem = LSM_MAX(nMax, 0);
    }
  }

  /* If there exists in-memory data ready to be flushed to disk, attempt
................................................................................
  int flags;
  u8 *aData;
  int nData;

  aData = fsPageData(pPg, &nData);

  nRec = pageGetNRec(aData, nData);
  iPtr = (int)pageGetPtr(aData, nData);
  flags = pageGetFlags(aData, nData);

  lsmStringInit(&s, pDb->pEnv);
  lsmStringAppendf(&s,"nCell=%d iPtr=%d flags=%d {", nRec, iPtr, flags);
  if( flags&SEGMENT_BTREE_FLAG ) iPtr = 0;

  for(i=0; i<nRec; i++){
    Page *pRef = 0;               /* Pointer to page iRef */
    int iChar;
    u8 *aKey; int nKey = 0;       /* Key */
    u8 *aVal = 0; int nVal = 0;   /* Value */
    int iTopic;
    u8 *aCell;
    int iPgPtr;
    int eType;

    aCell = pageGetCell(aData, nData, i);
    eType = *aCell++;
................................................................................
  int *piPgPtr,
  u8 **paKey, int *pnKey,
  u8 **paVal, int *pnVal,
  Blob *pBlob
){
  u8 *aData; int nData;           /* Page data */
  u8 *aKey; int nKey = 0;         /* Key */
  u8 *aVal = 0; int nVal = 0;     /* Value */
  int eType;
  int iPgPtr;
  Page *pRef = 0;                 /* Pointer to page iRef */
  u8 *aCell;

  aData = fsPageData(pPg, &nData);

................................................................................
    int iPtr;
    int flags;
    int iCell;
    u8 *aData; int nData;         /* Page data and size thereof */

    aData = fsPageData(pPg, &nData);
    nRec = pageGetNRec(aData, nData);
    iPtr = (int)pageGetPtr(aData, nData);
    flags = pageGetFlags(aData, nData);

    lsmStringInit(&str, pDb->pEnv);
    lsmStringAppendf(&str, "Page : %lld  (%d bytes)\n", iPg, nData);
    lsmStringAppendf(&str, "nRec : %d\n", nRec);
    lsmStringAppendf(&str, "iPtr : %d\n", iPtr);
    lsmStringAppendf(&str, "flags: %04x\n", flags);

Changes to ext/lsm1/lsm_tree.c.

274
275
276
277
278
279
280

281
282
283
284
285
286
287
288
289
...
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
...
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
...
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
...
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
....
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
....
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
....
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
....
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
....
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
....
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
....
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
....
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
....
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
....
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
....
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
....
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
....
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
....
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239

/*
** The pointer passed as the first argument points to an interior node,
** not a leaf. This function returns the offset of the iCell'th child
** sub-tree of the node.
*/
static u32 getChildPtr(TreeNode *p, int iVersion, int iCell){

  assert( iCell>=0 && iCell<=array_size(p->aiChildPtr) );
  if( p->iV2 && p->iV2<=iVersion && iCell==p->iV2Child ) return p->iV2Ptr;
  return p->aiChildPtr[iCell];
}

/*
** Given an offset within the *-shm file, return the associated chunk number.
*/
static int treeOffsetToChunk(u32 iOff){
................................................................................

/*
** Return a pointer to the mapped memory location associated with *-shm 
** file offset iPtr.
*/
static void *treeShmptr(lsm_db *pDb, u32 iPtr){

  assert( (iPtr>>15)<pDb->nShm );
  assert( pDb->apShm[iPtr>>15] );

  return iPtr ? treeShmptrUnsafe(pDb, iPtr) : 0;
}

static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
  return (ShmChunk *)(pDb->apShm[iChunk]);
................................................................................
    printf("% 6d %.*sleaf%.*s: %s\n", 
        iNode, nPath, zPath, 20-nPath-4, zSpace, s.z
    );
    lsmStringClear(&s);
  }else{
    for(i=0; i<4 && nHeight>0; i++){
      u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);
      zPath[nPath] = i+'0';
      zPath[nPath+1] = '/';

      if( iPtr ){
        dump_node_contents(pDb, iPtr, zPath, nPath+2, nHeight-1);
      }
      if( i!=3 && pNode->aiKeyPtr[i] ){
        TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TKV_LOADKEY,&b,&rc);
................................................................................
      iWrite = (iWrite + 3) & ~0x0003;
      assert( (iWrite % 4)==0 );
    }

    assert( iWrite );
    iChunk = treeOffsetToChunk(iWrite-1);
    iEof = (iChunk+1) * CHUNK_SIZE;
    assert( iEof>=iWrite && (iEof-iWrite)<CHUNK_SIZE );
    if( (iWrite+nByte)>iEof ){
      ShmChunk *pHdr;           /* Header of chunk just finished (iChunk) */
      ShmChunk *pFirst;         /* Header of chunk treehdr.iFirst */
      ShmChunk *pNext;          /* Header of new chunk */
      int iNext = 0;            /* Next chunk */
      int rc = LSM_OK;

................................................................................
    while( nRem>0 ){
      u8 *aAlloc;
      int nAlloc;
      u32 iWrite;

      iWrite = (pDb->treehdr.iWrite & (LSM_SHM_CHUNK_SIZE-1));
      iWrite = LSM_MAX(iWrite, LSM_SHM_CHUNK_HDR);
      nAlloc = LSM_MIN((LSM_SHM_CHUNK_SIZE-iWrite), nRem);

      aAlloc = treeShmptr(pDb, treeShmalloc(pDb, 0, nAlloc, pRc));
      if( aAlloc==0 ) break;
      memcpy(aAlloc, &a[n-nRem], nAlloc);
      nRem -= nAlloc;
    }
    a = pVal;
................................................................................
      }
    }else{
      p = 0;
    }
    nVisit++;
  }

  if( rc==LSM_OK && nVisit!=db->treehdr.nChunk-1 ){
    rc = LSM_CORRUPT_BKPT;
  }
  return rc;
}

/*
** Iterate through the current in-memory tree. If there are any v2-pointers
................................................................................
  int i;
  ShmChunk *p;
  ShmChunk *pMin = 0;
  u32 iMin = 0;

  /* Iterate through all shm chunks. Find the smallest shm-id present in
  ** the shared-memory region. */
  for(i=1; rc==LSM_OK && i<db->treehdr.nChunk; i++){
    p = treeShmChunkRc(db, i, &rc);
    if( p && (pMin==0 || shm_sequence_ge(pMin->iShmid, p->iShmid)) ){
      pMin = p;
      iMin = i;
    }
  }

................................................................................
    int nSort;
    int nByte;
    u32 iPrevShmid;
    ShmChunkLoc *aSort;

    /* Allocate space for a merge sort. */
    nSort = 1;
    while( nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
    nByte = sizeof(ShmChunkLoc) * nSort * 2;
    aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
    iPrevShmid = pMin->iShmid;

    /* Fix all shm-ids, if required. */
    if( rc==LSM_OK ){
      iPrevShmid = pMin->iShmid-1;
      for(i=1; i<db->treehdr.nChunk; i++){
        p = treeShmChunk(db, i);
        aSort[i-1].pShm = p;
        aSort[i-1].iLoc = i;
        if( i!=db->treehdr.iFirst ){
          if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
            p->iShmid = iPrevShmid--;
          }
        }
      }
      if( iMin!=db->treehdr.iFirst ){
        p = treeShmChunk(db, db->treehdr.iFirst);
................................................................................
    TreeRoot *p = &db->treehdr.root;
    TreeNode *pNew;
    u32 iNew;
    TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
    int iCell = pCsr->aiCell[pCsr->iNode];

    /* Create a copy of this node */
    if( (pCsr->iNode>0 && pCsr->iNode==(p->nHeight-1)) ){
      pNew = copyTreeLeaf(db, (TreeLeaf *)pNode, &iNew, pRc);
    }else{
      pNew = copyTreeNode(db, pNode, &iNew, pRc);
    }

    if( pNew ){
      /* Modify the value in the new version */
................................................................................
}

static int treeNextIsEndDelete(lsm_db *db, TreeCursor *pCsr){
  int iNode = pCsr->iNode;
  int iCell = pCsr->aiCell[iNode]+1;

  /* Cursor currently points to a leaf node. */
  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    if( iCell<3 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
      assert( rc==LSM_OK );
................................................................................
  return 0;
}

static int treePrevIsStartDelete(lsm_db *db, TreeCursor *pCsr){
  int iNode = pCsr->iNode;

  /* Cursor currently points to a leaf node. */
  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    int iCell = pCsr->aiCell[iNode]-1;
    if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
................................................................................
  int nVal                        /* Bytes in value data (or -ve for delete) */
){
  int rc = LSM_OK;                /* Return Code */
  TreeKey *pTreeKey;              /* New key-value being inserted */
  u32 iTreeKey;
  TreeRoot *p = &pDb->treehdr.root;
  TreeCursor csr;                 /* Cursor to seek to pKey/nKey */
  int res;                        /* Result of seek operation on csr */

  assert( nVal>=0 || pVal==0 );
  assert_tree_looks_ok(LSM_OK, pTree);
  assert( flags==LSM_INSERT       || flags==LSM_POINT_DELETE 
       || flags==LSM_START_DELETE || flags==LSM_END_DELETE 
  );
  assert( (flags & LSM_CONTIGUOUS)==0 );
................................................................................
  int iSlot = pCsr->aiCell[pCsr->iNode];
  int bLeaf;
  int rc = LSM_OK;

  assert( pNode->aiKeyPtr[1] );
  assert( pNode->aiKeyPtr[iSlot] );
  assert( iSlot==0 || iSlot==1 || iSlot==2 );
  assert( (pCsr->iNode==(db->treehdr.root.nHeight-1))==(iNewptr==0) );

  bLeaf = (pCsr->iNode==(p->nHeight-1) && p->nHeight>1);
  
  if( pNode->aiKeyPtr[0] || pNode->aiKeyPtr[2] ){
    /* There are currently at least 2 keys on this node. So just create
    ** a new copy of the node with one of the keys removed. If the node
    ** happens to be the root node of the tree, allocate an entire 
    ** TreeNode structure instead of just a TreeLeaf.  */
    TreeNode *pNew;
................................................................................
          }
        }
      }
      if( iDir==-1 ){
        iPSlot--;
        pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
        if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
        pCsr->aiCell[pCsr->iNode] = iPSlot;
      }

      rc = treeDeleteEntry(db, pCsr, iNew1);
    }
  }

  return rc;
................................................................................
    bDone = 1;
    if( lsmTreeCursorValid(&csr) ){
      lsmTreeCursorKey(&csr, 0, &pDel, &nDel);
      if( treeKeycmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0;
    }

    if( bDone==0 ){
      if( csr.iNode==(p->nHeight-1) ){
        /* The element to delete already lies on a leaf node */
        rc = treeDeleteEntry(db, &csr, 0);
      }else{
        /* 1. Overwrite the current key with a copy of the next key in the 
        **    tree (key N).
        **
        ** 2. Seek to key N (cursor will stop at the internal node copy of
................................................................................
        **    N). Move to the next key (original copy of N). Delete
        **    this entry. 
        */
        u32 iKey;
        TreeKey *pKey;
        int iNode = csr.iNode;
        lsmTreeCursorNext(&csr);
        assert( csr.iNode==(p->nHeight-1) );

        iKey = csr.apTreeNode[csr.iNode]->aiKeyPtr[csr.aiCell[csr.iNode]];
        lsmTreeCursorPrev(&csr);

        treeOverwriteKey(db, &csr, iKey, &rc);
        pKey = treeShmkey(db, iKey, TKV_LOADKEY, &blob, &rc);
        if( pKey ){
................................................................................
        pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, iTreeKey);
        if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
          pTreeKey = treeShmkey(pDb, iTreeKey, TKV_LOADKEY, &b, &rc);
          if( rc ) break;
        }
        res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
        if( res==0 ){
          pCsr->aiCell[iNode] = iTest;
          break;
        }
      }else{
        iTest = 1;
      }

      if( iNode<(pRoot->nHeight-1) ){
        iNodePtr = getChildPtr(pNode, pRoot->iTransId, iTest + (res<0));
      }else{
        iNodePtr = 0;
      }
      pCsr->aiCell[iNode] = iTest + (iNodePtr && (res<0));
    }

    *pRes = res;
    pCsr->iNode = iNode;
    tblobFree(pDb, &b);
  }

................................................................................
      u32 iNodePtr;
      pCsr->iNode++;
      iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
      pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
      if( rc!=LSM_OK ) break;
      pCsr->apTreeNode[pCsr->iNode] = pNode;
      iCell = 1 + (pNode->aiKeyPtr[2]!=0) + (pCsr->iNode < iLeaf);
      pCsr->aiCell[pCsr->iNode] = iCell;
    }while( pCsr->iNode < iLeaf );
  }

  /* Otherwise, the next key is found by following pointer up the tree until
  ** there is a key immediately to the left of the pointer followed to reach
  ** the sub-tree containing the current key. */
  else{
    do {
      iCell = pCsr->aiCell[pCsr->iNode]-1;
      if( iCell>=0 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
    }while( (--pCsr->iNode)>=0 );
    pCsr->aiCell[pCsr->iNode] = iCell;
  }

#ifndef NDEBUG
  if( pCsr->iNode>=0 ){
    TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
    assert( rc || treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)<0 );
  }
................................................................................
      iCell = ((pNode->aiKeyPtr[2]==0) ? 2 : 3);
    }else{
      iCell = ((pNode->aiKeyPtr[0]==0) ? 1 : 0);
    }
    pCsr->iNode++;
    pCsr->apTreeNode[pCsr->iNode] = pNode;

    if( pCsr->iNode<pRoot->nHeight-1 ){
      iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
    }else{
      iNodePtr = 0;
    }
    pCsr->aiCell[pCsr->iNode] = iCell - (iNodePtr==0 && bLast);
  }

  return rc;
}

int lsmTreeCursorFlags(TreeCursor *pCsr){
  int flags = 0;







>

|







 







|







 







|







 







|







 







|







 







|







 







|







 







|







|



|







 







|







 







|







 







|







 







|







 







|

|







 







|







 







|







 







|







 







|






|




|







 







|











|







 







|




|







274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
...
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
...
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
...
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
...
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
....
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
....
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
....
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
....
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
....
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
....
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
....
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
....
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
....
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
....
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
....
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
....
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
....
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
....
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240

/*
** The pointer passed as the first argument points to an interior node,
** not a leaf. This function returns the offset of the iCell'th child
** sub-tree of the node.
*/
static u32 getChildPtr(TreeNode *p, int iVersion, int iCell){
  assert( iVersion>=0 );
  assert( iCell>=0 && iCell<=array_size(p->aiChildPtr) );
  if( p->iV2 && p->iV2<=(u32)iVersion && iCell==p->iV2Child ) return p->iV2Ptr;
  return p->aiChildPtr[iCell];
}

/*
** Given an offset within the *-shm file, return the associated chunk number.
*/
static int treeOffsetToChunk(u32 iOff){
................................................................................

/*
** Return a pointer to the mapped memory location associated with *-shm 
** file offset iPtr.
*/
static void *treeShmptr(lsm_db *pDb, u32 iPtr){

  assert( (iPtr>>15)<(u32)pDb->nShm );
  assert( pDb->apShm[iPtr>>15] );

  return iPtr ? treeShmptrUnsafe(pDb, iPtr) : 0;
}

static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
  return (ShmChunk *)(pDb->apShm[iChunk]);
................................................................................
    printf("% 6d %.*sleaf%.*s: %s\n", 
        iNode, nPath, zPath, 20-nPath-4, zSpace, s.z
    );
    lsmStringClear(&s);
  }else{
    for(i=0; i<4 && nHeight>0; i++){
      u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);
      zPath[nPath] = (char)(i+'0');
      zPath[nPath+1] = '/';

      if( iPtr ){
        dump_node_contents(pDb, iPtr, zPath, nPath+2, nHeight-1);
      }
      if( i!=3 && pNode->aiKeyPtr[i] ){
        TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TKV_LOADKEY,&b,&rc);
................................................................................
      iWrite = (iWrite + 3) & ~0x0003;
      assert( (iWrite % 4)==0 );
    }

    assert( iWrite );
    iChunk = treeOffsetToChunk(iWrite-1);
    iEof = (iChunk+1) * CHUNK_SIZE;
    assert( iEof>=iWrite && (iEof-iWrite)<(u32)CHUNK_SIZE );
    if( (iWrite+nByte)>iEof ){
      ShmChunk *pHdr;           /* Header of chunk just finished (iChunk) */
      ShmChunk *pFirst;         /* Header of chunk treehdr.iFirst */
      ShmChunk *pNext;          /* Header of new chunk */
      int iNext = 0;            /* Next chunk */
      int rc = LSM_OK;

................................................................................
    while( nRem>0 ){
      u8 *aAlloc;
      int nAlloc;
      u32 iWrite;

      iWrite = (pDb->treehdr.iWrite & (LSM_SHM_CHUNK_SIZE-1));
      iWrite = LSM_MAX(iWrite, LSM_SHM_CHUNK_HDR);
      nAlloc = LSM_MIN((LSM_SHM_CHUNK_SIZE-iWrite), (u32)nRem);

      aAlloc = treeShmptr(pDb, treeShmalloc(pDb, 0, nAlloc, pRc));
      if( aAlloc==0 ) break;
      memcpy(aAlloc, &a[n-nRem], nAlloc);
      nRem -= nAlloc;
    }
    a = pVal;
................................................................................
      }
    }else{
      p = 0;
    }
    nVisit++;
  }

  if( rc==LSM_OK && (u32)nVisit!=db->treehdr.nChunk-1 ){
    rc = LSM_CORRUPT_BKPT;
  }
  return rc;
}

/*
** Iterate through the current in-memory tree. If there are any v2-pointers
................................................................................
  int i;
  ShmChunk *p;
  ShmChunk *pMin = 0;
  u32 iMin = 0;

  /* Iterate through all shm chunks. Find the smallest shm-id present in
  ** the shared-memory region. */
  for(i=1; rc==LSM_OK && (u32)i<db->treehdr.nChunk; i++){
    p = treeShmChunkRc(db, i, &rc);
    if( p && (pMin==0 || shm_sequence_ge(pMin->iShmid, p->iShmid)) ){
      pMin = p;
      iMin = i;
    }
  }

................................................................................
    int nSort;
    int nByte;
    u32 iPrevShmid;
    ShmChunkLoc *aSort;

    /* Allocate space for a merge sort. */
    nSort = 1;
    while( (u32)nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
    nByte = sizeof(ShmChunkLoc) * nSort * 2;
    aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
    iPrevShmid = pMin->iShmid;

    /* Fix all shm-ids, if required. */
    if( rc==LSM_OK ){
      iPrevShmid = pMin->iShmid-1;
      for(i=1; (u32)i<db->treehdr.nChunk; i++){
        p = treeShmChunk(db, i);
        aSort[i-1].pShm = p;
        aSort[i-1].iLoc = i;
        if( (u32)i!=db->treehdr.iFirst ){
          if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
            p->iShmid = iPrevShmid--;
          }
        }
      }
      if( iMin!=db->treehdr.iFirst ){
        p = treeShmChunk(db, db->treehdr.iFirst);
................................................................................
    TreeRoot *p = &db->treehdr.root;
    TreeNode *pNew;
    u32 iNew;
    TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
    int iCell = pCsr->aiCell[pCsr->iNode];

    /* Create a copy of this node */
    if( (pCsr->iNode>0 && (u32)pCsr->iNode==(p->nHeight-1)) ){
      pNew = copyTreeLeaf(db, (TreeLeaf *)pNode, &iNew, pRc);
    }else{
      pNew = copyTreeNode(db, pNode, &iNew, pRc);
    }

    if( pNew ){
      /* Modify the value in the new version */
................................................................................
}

static int treeNextIsEndDelete(lsm_db *db, TreeCursor *pCsr){
  int iNode = pCsr->iNode;
  int iCell = pCsr->aiCell[iNode]+1;

  /* Cursor currently points to a leaf node. */
  assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    if( iCell<3 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
      assert( rc==LSM_OK );
................................................................................
  return 0;
}

static int treePrevIsStartDelete(lsm_db *db, TreeCursor *pCsr){
  int iNode = pCsr->iNode;

  /* Cursor currently points to a leaf node. */
  assert( (u32)pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    int iCell = pCsr->aiCell[iNode]-1;
    if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell]);
................................................................................
  int nVal                        /* Bytes in value data (or -ve for delete) */
){
  int rc = LSM_OK;                /* Return Code */
  TreeKey *pTreeKey;              /* New key-value being inserted */
  u32 iTreeKey;
  TreeRoot *p = &pDb->treehdr.root;
  TreeCursor csr;                 /* Cursor to seek to pKey/nKey */
  int res = 0;                    /* Result of seek operation on csr */

  assert( nVal>=0 || pVal==0 );
  assert_tree_looks_ok(LSM_OK, pTree);
  assert( flags==LSM_INSERT       || flags==LSM_POINT_DELETE 
       || flags==LSM_START_DELETE || flags==LSM_END_DELETE 
  );
  assert( (flags & LSM_CONTIGUOUS)==0 );
................................................................................
  int iSlot = pCsr->aiCell[pCsr->iNode];
  int bLeaf;
  int rc = LSM_OK;

  assert( pNode->aiKeyPtr[1] );
  assert( pNode->aiKeyPtr[iSlot] );
  assert( iSlot==0 || iSlot==1 || iSlot==2 );
  assert( ((u32)pCsr->iNode==(db->treehdr.root.nHeight-1))==(iNewptr==0) );

  bLeaf = ((u32)pCsr->iNode==(p->nHeight-1) && p->nHeight>1);
  
  if( pNode->aiKeyPtr[0] || pNode->aiKeyPtr[2] ){
    /* There are currently at least 2 keys on this node. So just create
    ** a new copy of the node with one of the keys removed. If the node
    ** happens to be the root node of the tree, allocate an entire 
    ** TreeNode structure instead of just a TreeLeaf.  */
    TreeNode *pNew;
................................................................................
          }
        }
      }
      if( iDir==-1 ){
        iPSlot--;
        pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
        if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
        pCsr->aiCell[pCsr->iNode] = (u8)iPSlot;
      }

      rc = treeDeleteEntry(db, pCsr, iNew1);
    }
  }

  return rc;
................................................................................
    bDone = 1;
    if( lsmTreeCursorValid(&csr) ){
      lsmTreeCursorKey(&csr, 0, &pDel, &nDel);
      if( treeKeycmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0;
    }

    if( bDone==0 ){
      if( (u32)csr.iNode==(p->nHeight-1) ){
        /* The element to delete already lies on a leaf node */
        rc = treeDeleteEntry(db, &csr, 0);
      }else{
        /* 1. Overwrite the current key with a copy of the next key in the 
        **    tree (key N).
        **
        ** 2. Seek to key N (cursor will stop at the internal node copy of
................................................................................
        **    N). Move to the next key (original copy of N). Delete
        **    this entry. 
        */
        u32 iKey;
        TreeKey *pKey;
        int iNode = csr.iNode;
        lsmTreeCursorNext(&csr);
        assert( (u32)csr.iNode==(p->nHeight-1) );

        iKey = csr.apTreeNode[csr.iNode]->aiKeyPtr[csr.aiCell[csr.iNode]];
        lsmTreeCursorPrev(&csr);

        treeOverwriteKey(db, &csr, iKey, &rc);
        pKey = treeShmkey(db, iKey, TKV_LOADKEY, &blob, &rc);
        if( pKey ){
................................................................................
        pTreeKey = (TreeKey*)treeShmptrUnsafe(pDb, iTreeKey);
        if( !(pTreeKey->flags & LSM_CONTIGUOUS) ){
          pTreeKey = treeShmkey(pDb, iTreeKey, TKV_LOADKEY, &b, &rc);
          if( rc ) break;
        }
        res = treeKeycmp((void *)&pTreeKey[1], pTreeKey->nKey, pKey, nKey);
        if( res==0 ){
          pCsr->aiCell[iNode] = (u8)iTest;
          break;
        }
      }else{
        iTest = 1;
      }

      if( (u32)iNode<(pRoot->nHeight-1) ){
        iNodePtr = getChildPtr(pNode, pRoot->iTransId, iTest + (res<0));
      }else{
        iNodePtr = 0;
      }
      pCsr->aiCell[iNode] = (u8)(iTest + (iNodePtr && (res<0)));
    }

    *pRes = res;
    pCsr->iNode = iNode;
    tblobFree(pDb, &b);
  }

................................................................................
      u32 iNodePtr;
      pCsr->iNode++;
      iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
      pNode = (TreeNode *)treeShmptr(pDb, iNodePtr);
      if( rc!=LSM_OK ) break;
      pCsr->apTreeNode[pCsr->iNode] = pNode;
      iCell = 1 + (pNode->aiKeyPtr[2]!=0) + (pCsr->iNode < iLeaf);
      pCsr->aiCell[pCsr->iNode] = (u8)iCell;
    }while( pCsr->iNode < iLeaf );
  }

  /* Otherwise, the next key is found by following pointer up the tree until
  ** there is a key immediately to the left of the pointer followed to reach
  ** the sub-tree containing the current key. */
  else{
    do {
      iCell = pCsr->aiCell[pCsr->iNode]-1;
      if( iCell>=0 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
    }while( (--pCsr->iNode)>=0 );
    pCsr->aiCell[pCsr->iNode] = (u8)iCell;
  }

#ifndef NDEBUG
  if( pCsr->iNode>=0 ){
    TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
    assert( rc || treeKeycmp(TKV_KEY(pK2),pK2->nKey,TKV_KEY(pK1),pK1->nKey)<0 );
  }
................................................................................
      iCell = ((pNode->aiKeyPtr[2]==0) ? 2 : 3);
    }else{
      iCell = ((pNode->aiKeyPtr[0]==0) ? 1 : 0);
    }
    pCsr->iNode++;
    pCsr->apTreeNode[pCsr->iNode] = pNode;

    if( (u32)pCsr->iNode<pRoot->nHeight-1 ){
      iNodePtr = getChildPtr(pNode, pRoot->iTransId, iCell);
    }else{
      iNodePtr = 0;
    }
    pCsr->aiCell[pCsr->iNode] = (u8)(iCell - (iNodePtr==0 && bLast));
  }

  return rc;
}

int lsmTreeCursorFlags(TreeCursor *pCsr){
  int flags = 0;

Changes to ext/lsm1/lsm_varint.c.

172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
  }
  if( z[0]==250 ){
    *piVal = (z[1]<<16) + (z[2]<<8) + z[3];
    return 4;
  }

  ret = lsmSqlite4GetVarint64(z, &i);
  *piVal = i;
  return ret;
}

int lsmVarintLen32(int n){
  u8 aData[9];
  return lsmVarintPut32(aData, n);
}







|







172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
  }
  if( z[0]==250 ){
    *piVal = (z[1]<<16) + (z[2]<<8) + z[3];
    return 4;
  }

  ret = lsmSqlite4GetVarint64(z, &i);
  *piVal = (int)i;
  return ret;
}

int lsmVarintLen32(int n){
  u8 aData[9];
  return lsmVarintPut32(aData, n);
}

Changes to ext/lsm1/lsm_vtab.c.

153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
...
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
...
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
...
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
...
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
...
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687


/*
** Advance a lsm1_cursor to its next row of output.
*/
static int lsm1Next(sqlite3_vtab_cursor *cur){
  lsm1_cursor *pCur = (lsm1_cursor*)cur;
  int rc;
  if( pCur->bUnique ){
    pCur->atEof = 1;
  }else{
    if( pCur->isDesc ){
      rc = lsm_csr_prev(pCur->pLsmCur);
    }else{
      rc = lsm_csr_next(pCur->pLsmCur);
................................................................................
        pVal = (const void*)sqlite3_value_text(pValue);
        if( pVal==0 ) return SQLITE_NOMEM;
      }
      if( nVal+1>nSpace ){
        pSpace = sqlite3_malloc( nVal+1 );
        if( pSpace==0 ) return SQLITE_NOMEM;
      }
      pSpace[0] = eType;
      memcpy(&pSpace[1], pVal, nVal);
      *ppKey = pSpace;
      *pnKey = nVal+1;
      break;
    }
    case SQLITE_INTEGER: {
      sqlite3_int64 iVal = sqlite3_value_int64(pValue);
................................................................................
        if( iVal==0xffffffffffffffffLL ) return SQLITE_ERROR;
        uVal = *(sqlite3_uint64*)&iVal;
        eType = LSM1_TYPE_NEGATIVE;
      }else{
        uVal = iVal;
        eType = LSM1_TYPE_POSITIVE;
      }
      pSpace[0] = eType;
      *ppKey = pSpace;
      *pnKey = 1 + lsm1PutVarint64(&pSpace[1], uVal);
    }
  }
  return SQLITE_OK;
}

................................................................................
  sqlite3_value **argv,
  sqlite_int64 *pRowid
){
  lsm1_vtab *p = (lsm1_vtab*)pVTab;
  const void *pKey;
  int nKey;
  int eType;
  int rc;
  sqlite3_value *pValue;
  const unsigned char *pVal;
  unsigned char *pData;
  int nVal;
  unsigned char pSpace[100];

  if( argc==1 ){
................................................................................
          pVal = (unsigned char*)sqlite3_value_blob(pValue);
        }
        nVal = sqlite3_value_bytes(pValue);
        pData = sqlite3_malloc( nVal+1 );
        if( pData==0 ){
          rc = SQLITE_NOMEM;
        }else{
          pData[0] = eType;
          memcpy(&pData[1], pVal, nVal);
          rc = lsm_insert(p->pDb, pKey, nKey, pData, nVal+1);
          sqlite3_free(pData);
        }
        break;
      }
      case SQLITE_INTEGER:
................................................................................
          assert( sizeof(r)==sizeof(x) );
          memcpy(&x, &r, sizeof(r));
        }
        for(i=8; x>0 && i>=1; i--){
          aVal[i] = x & 0xff;
          x >>= 8;
        }
        aVal[i] = eType;
        rc = lsm_insert(p->pDb, pKey, nKey, &aVal[i], 9-i);
        break;
      }
    }
  }
  if( pKey!=(const void*)pSpace ) sqlite3_free((void*)pKey);
  return rc==LSM_OK ? SQLITE_OK : SQLITE_ERROR;







|







 







|







 







|







 







|







 







|







 







|







153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
...
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
...
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
...
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
...
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
...
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687


/*
** Advance a lsm1_cursor to its next row of output.
*/
static int lsm1Next(sqlite3_vtab_cursor *cur){
  lsm1_cursor *pCur = (lsm1_cursor*)cur;
  int rc = LSM_OK;
  if( pCur->bUnique ){
    pCur->atEof = 1;
  }else{
    if( pCur->isDesc ){
      rc = lsm_csr_prev(pCur->pLsmCur);
    }else{
      rc = lsm_csr_next(pCur->pLsmCur);
................................................................................
        pVal = (const void*)sqlite3_value_text(pValue);
        if( pVal==0 ) return SQLITE_NOMEM;
      }
      if( nVal+1>nSpace ){
        pSpace = sqlite3_malloc( nVal+1 );
        if( pSpace==0 ) return SQLITE_NOMEM;
      }
      pSpace[0] = (unsigned char)eType;
      memcpy(&pSpace[1], pVal, nVal);
      *ppKey = pSpace;
      *pnKey = nVal+1;
      break;
    }
    case SQLITE_INTEGER: {
      sqlite3_int64 iVal = sqlite3_value_int64(pValue);
................................................................................
        if( iVal==0xffffffffffffffffLL ) return SQLITE_ERROR;
        uVal = *(sqlite3_uint64*)&iVal;
        eType = LSM1_TYPE_NEGATIVE;
      }else{
        uVal = iVal;
        eType = LSM1_TYPE_POSITIVE;
      }
      pSpace[0] = (unsigned char)eType;
      *ppKey = pSpace;
      *pnKey = 1 + lsm1PutVarint64(&pSpace[1], uVal);
    }
  }
  return SQLITE_OK;
}

................................................................................
  sqlite3_value **argv,
  sqlite_int64 *pRowid
){
  lsm1_vtab *p = (lsm1_vtab*)pVTab;
  const void *pKey;
  int nKey;
  int eType;
  int rc = LSM_OK;
  sqlite3_value *pValue;
  const unsigned char *pVal;
  unsigned char *pData;
  int nVal;
  unsigned char pSpace[100];

  if( argc==1 ){
................................................................................
          pVal = (unsigned char*)sqlite3_value_blob(pValue);
        }
        nVal = sqlite3_value_bytes(pValue);
        pData = sqlite3_malloc( nVal+1 );
        if( pData==0 ){
          rc = SQLITE_NOMEM;
        }else{
          pData[0] = (unsigned char)eType;
          memcpy(&pData[1], pVal, nVal);
          rc = lsm_insert(p->pDb, pKey, nKey, pData, nVal+1);
          sqlite3_free(pData);
        }
        break;
      }
      case SQLITE_INTEGER:
................................................................................
          assert( sizeof(r)==sizeof(x) );
          memcpy(&x, &r, sizeof(r));
        }
        for(i=8; x>0 && i>=1; i--){
          aVal[i] = x & 0xff;
          x >>= 8;
        }
        aVal[i] = (unsigned char)eType;
        rc = lsm_insert(p->pDb, pKey, nKey, &aVal[i], 9-i);
        break;
      }
    }
  }
  if( pKey!=(const void*)pSpace ) sqlite3_free((void*)pKey);
  return rc==LSM_OK ? SQLITE_OK : SQLITE_ERROR;

Changes to ext/lsm1/lsm_win32.c.

644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
  int nNew = iChunk + 1;
  lsm_i64 nReq = nNew * sz;

  *ppShm = NULL;
  assert( sz>=0 );
  assert( sz==LSM_SHM_CHUNK_SIZE );
  if( iChunk>=pWin32File->nShm ){
    int i;
    LPHANDLE ahNew;
    LPVOID *apNew;
    LARGE_INTEGER fileSize;

    /* If the shared-memory file has not been opened, open it now. */
    if( pWin32File->hShmFile==NULL ){
      char *zShm = win32ShmFile(pWin32File);







<







644
645
646
647
648
649
650

651
652
653
654
655
656
657
  int nNew = iChunk + 1;
  lsm_i64 nReq = nNew * sz;

  *ppShm = NULL;
  assert( sz>=0 );
  assert( sz==LSM_SHM_CHUNK_SIZE );
  if( iChunk>=pWin32File->nShm ){

    LPHANDLE ahNew;
    LPVOID *apNew;
    LARGE_INTEGER fileSize;

    /* If the shared-memory file has not been opened, open it now. */
    if( pWin32File->hShmFile==NULL ){
      char *zShm = win32ShmFile(pWin32File);