Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add the INFO_COMPRESSION_ID request. And the factory method for providing compression/encryption functions.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | compression-id
Files: files | file ages | folders
SHA1: bb85de9cd3a59efdfdc92ae6b87dc73b81ca63e3
User & Date: dan 2013-02-07 19:50:51.233
Context
2013-02-08
11:30
Merge compression-id branch with trunk. check-in: 76297939d3 user: dan tags: trunk
2013-02-07
19:50
Add the INFO_COMPRESSION_ID request. And the factory method for providing compression/encryption functions. Leaf check-in: bb85de9cd3 user: dan tags: compression-id
2013-02-06
19:43
Add API to register a compression-factory method with an lsm handle. check-in: 60908fd4d1 user: dan tags: compression-id
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsm.h.
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
  int (*xFactory)(void *, lsm_db *, unsigned int);
  void (*xFree)(void *pCtx);
};

#define LSM_COMPRESSION_EMPTY 0
#define LSM_COMPRESSION_NONE  1


/*
** CAPI: Allocating and Freeing Memory
**
** Invoke the memory allocation functions that belong to environment
** pEnv. Or the system defaults if no memory allocation functions have 
** been registered.
*/







<







299
300
301
302
303
304
305

306
307
308
309
310
311
312
  int (*xFactory)(void *, lsm_db *, unsigned int);
  void (*xFree)(void *pCtx);
};

#define LSM_COMPRESSION_EMPTY 0
#define LSM_COMPRESSION_NONE  1


/*
** CAPI: Allocating and Freeing Memory
**
** Invoke the memory allocation functions that belong to environment
** pEnv. Or the system defaults if no memory allocation functions have 
** been registered.
*/
429
430
431
432
433
434
435





436
437
438
439
440
441
442
443
444
445
446
447
448
449

450
451
452
453
454
455
456
**   accumulate new data written to the database. The other tree structure -
**   the old tree - is a read-only tree holding older data and may be flushed 
**   to disk at any time.
** 
**   Assuming no error occurs, the location pointed to by the first of the two
**   (int *) arguments is set to the size of the old in-memory tree in KB.
**   The second is set to the size of the current, or live in-memory tree.





*/
#define LSM_INFO_NWRITE           1
#define LSM_INFO_NREAD            2
#define LSM_INFO_DB_STRUCTURE     3
#define LSM_INFO_LOG_STRUCTURE    4
#define LSM_INFO_ARRAY_STRUCTURE  5
#define LSM_INFO_PAGE_ASCII_DUMP  6
#define LSM_INFO_PAGE_HEX_DUMP    7
#define LSM_INFO_FREELIST         8
#define LSM_INFO_ARRAY_PAGES      9
#define LSM_INFO_CHECKPOINT_SIZE 10
#define LSM_INFO_TREE_SIZE       11

#define LSM_INFO_FREELIST_SIZE   12



/* 
** CAPI: Opening and Closing Write Transactions
**
** These functions are used to open and close transactions and nested 
** sub-transactions.







>
>
>
>
>












<

>







428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451

452
453
454
455
456
457
458
459
460
**   accumulate new data written to the database. The other tree structure -
**   the old tree - is a read-only tree holding older data and may be flushed 
**   to disk at any time.
** 
**   Assuming no error occurs, the location pointed to by the first of the two
**   (int *) arguments is set to the size of the old in-memory tree in KB.
**   The second is set to the size of the current, or live in-memory tree.
**
** LSM_INFO_COMPRESSION_ID:
**   This value should be followed by a single argument of type 
**   (unsigned int *). If successful, the location pointed to is populated 
**   with the database compression id before returning.
*/
#define LSM_INFO_NWRITE           1
#define LSM_INFO_NREAD            2
#define LSM_INFO_DB_STRUCTURE     3
#define LSM_INFO_LOG_STRUCTURE    4
#define LSM_INFO_ARRAY_STRUCTURE  5
#define LSM_INFO_PAGE_ASCII_DUMP  6
#define LSM_INFO_PAGE_HEX_DUMP    7
#define LSM_INFO_FREELIST         8
#define LSM_INFO_ARRAY_PAGES      9
#define LSM_INFO_CHECKPOINT_SIZE 10
#define LSM_INFO_TREE_SIZE       11

#define LSM_INFO_FREELIST_SIZE   12
#define LSM_INFO_COMPRESSION_ID  13


/* 
** CAPI: Opening and Closing Write Transactions
**
** These functions are used to open and close transactions and nested 
** sub-transactions.
Changes to src/lsmInt.h.
571
572
573
574
575
576
577


578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595

596
597
598
599
600
601
602

int lsmCheckpointSaveWorker(lsm_db *pDb, int);
int lsmDatabaseFull(lsm_db *pDb);
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite);

int lsmCheckpointSize(lsm_db *db, int *pnByte);



/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);
int lsmTreeInit(lsm_db *);
int lsmTreeRepair(lsm_db *);

void lsmTreeMakeOld(lsm_db *pDb);
void lsmTreeDiscardOld(lsm_db *pDb);
int lsmTreeHasOld(lsm_db *pDb);

int lsmTreeSize(lsm_db *);
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit);
int lsmTreeLoadHeader(lsm_db *pDb, int *);
int lsmTreeLoadHeaderOk(lsm_db *, int);

int lsmTreeInsert(lsm_db *pDb, void *pKey, int nKey, void *pVal, int nVal);

void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark);
void lsmTreeMark(lsm_db *pDb, TreeMark *pMark);

int lsmTreeCursorNew(lsm_db *pDb, int, TreeCursor **);
void lsmTreeCursorDestroy(TreeCursor *);

int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes);







>
>


















>







571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605

int lsmCheckpointSaveWorker(lsm_db *pDb, int);
int lsmDatabaseFull(lsm_db *pDb);
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite);

int lsmCheckpointSize(lsm_db *db, int *pnByte);

int lsmInfoCompressionId(lsm_db *db, u32 *piCmpId);

/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);
int lsmTreeInit(lsm_db *);
int lsmTreeRepair(lsm_db *);

void lsmTreeMakeOld(lsm_db *pDb);
void lsmTreeDiscardOld(lsm_db *pDb);
int lsmTreeHasOld(lsm_db *pDb);

int lsmTreeSize(lsm_db *);
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit);
int lsmTreeLoadHeader(lsm_db *pDb, int *);
int lsmTreeLoadHeaderOk(lsm_db *, int);

int lsmTreeInsert(lsm_db *pDb, void *pKey, int nKey, void *pVal, int nVal);
int lsmTreeDelete(lsm_db *db, void *pKey1, int nKey1, void *pKey2, int nKey2);
void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark);
void lsmTreeMark(lsm_db *pDb, TreeMark *pMark);

int lsmTreeCursorNew(lsm_db *pDb, int, TreeCursor **);
void lsmTreeCursorDestroy(TreeCursor *);

int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes);
Changes to src/lsm_ckpt.c.
447
448
449
450
451
452
453



454
455
456
457
458
459
460
461
462
463
464
      ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc);
      ckptSetValue(&ckpt, iOut++, p->iId & 0xFFFFFFFF, &rc);
    }
  }

  /* Write the checkpoint header */
  assert( iId>=0 );



  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_CMPID, pSnap->iCmpId, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);

  if( bCksum ){







>
>
>



|







447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
      ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc);
      ckptSetValue(&ckpt, iOut++, p->iId & 0xFFFFFFFF, &rc);
    }
  }

  /* Write the checkpoint header */
  assert( iId>=0 );
  assert( pSnap->iCmpId==pDb->compress.iId
       || pSnap->iCmpId==LSM_COMPRESSION_EMPTY 
  );
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_CMPID, pDb->compress.iId, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);

  if( bCksum ){
878
879
880
881
882
883
884












885
886
887
888
889
890
891
      }
    }

    lsmShmBarrier(pDb);
  }
  return LSM_PROTOCOL;
}













int lsmCheckpointLoadOk(lsm_db *pDb, int iSnap){
  u32 *aShm;
  assert( iSnap==1 || iSnap==2 );
  aShm = (iSnap==1) ? pDb->pShmhdr->aSnap1 : pDb->pShmhdr->aSnap2;
  return (lsmCheckpointId(pDb->aSnapshot, 0)==lsmCheckpointId(aShm, 0) );
}







>
>
>
>
>
>
>
>
>
>
>
>







881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
      }
    }

    lsmShmBarrier(pDb);
  }
  return LSM_PROTOCOL;
}

int lsmInfoCompressionId(lsm_db *db, u32 *piCmpId){
  int rc;

  assert( db->pClient==0 && db->pWorker==0 );
  rc = lsmCheckpointLoad(db, 0);
  if( rc==LSM_OK ){
    *piCmpId = db->aSnapshot[CKPT_HDR_CMPID];
  }

  return rc;
}

int lsmCheckpointLoadOk(lsm_db *pDb, int iSnap){
  u32 *aShm;
  assert( iSnap==1 || iSnap==2 );
  aShm = (iSnap==1) ? pDb->pShmhdr->aSnap1 : pDb->pShmhdr->aSnap2;
  return (lsmCheckpointId(pDb->aSnapshot, 0)==lsmCheckpointId(aShm, 0) );
}
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
    int iIn = CKPT_HDR_SIZE + CKPT_APPENDLIST_SIZE + CKPT_LOGPTR_SIZE;

    pNew->iId = lsmCheckpointId(aCkpt, 0);
    pNew->nBlock = aCkpt[CKPT_HDR_NBLOCK];
    pNew->nWrite = aCkpt[CKPT_HDR_NWRITE];
    rc = ckptLoadLevels(pDb, aCkpt, &iIn, nLevel, &pNew->pLevel);
    pNew->iLogOff = lsmCheckpointLogOffset(aCkpt);

    pNew->iCmpId = aCkpt[CKPT_HDR_CMPID];
    if( pNew->iCmpId==LSM_COMPRESSION_EMPTY ){
      pNew->iCmpId = pDb->compress.iId;
    }

    /* Make a copy of the append-list */
    for(i=0; i<LSM_APPLIST_SZ; i++){
      u32 *a = &aCkpt[CKPT_HDR_SIZE + CKPT_LOGPTR_SIZE + i*2];
      pNew->aiAppend[i] = ckptRead64(a);
    }








<

<
<
<







969
970
971
972
973
974
975

976



977
978
979
980
981
982
983
    int iIn = CKPT_HDR_SIZE + CKPT_APPENDLIST_SIZE + CKPT_LOGPTR_SIZE;

    pNew->iId = lsmCheckpointId(aCkpt, 0);
    pNew->nBlock = aCkpt[CKPT_HDR_NBLOCK];
    pNew->nWrite = aCkpt[CKPT_HDR_NWRITE];
    rc = ckptLoadLevels(pDb, aCkpt, &iIn, nLevel, &pNew->pLevel);
    pNew->iLogOff = lsmCheckpointLogOffset(aCkpt);

    pNew->iCmpId = aCkpt[CKPT_HDR_CMPID];




    /* Make a copy of the append-list */
    for(i=0; i<LSM_APPLIST_SZ; i++){
      u32 *a = &aCkpt[CKPT_HDR_SIZE + CKPT_LOGPTR_SIZE + i*2];
      pNew->aiAppend[i] = ckptRead64(a);
    }

Changes to src/lsm_main.c.
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
  return 0;
}

int lsmInfoFreelist(lsm_db *pDb, char **pzOut){
  Snapshot *pWorker;              /* Worker snapshot */
  int bUnlock = 0;
  LsmString s;
  int i;
  int rc;

  /* Obtain the worker snapshot */
  rc = infoGetWorker(pDb, &pWorker, &bUnlock);
  if( rc!=LSM_OK ) return rc;

  lsmStringInit(&s, pDb->pEnv);
  rc = lsmWalkFreelist(pDb, 0, infoFreelistCb, &s);
  if( rc!=LSM_OK ){
    lsmFree(pDb->pEnv, s.z);
  }else{
    *pzOut = s.z;
  }

  /* Release the snapshot and return */
  infoFreeWorker(pDb, bUnlock);
  return rc;
}

static int infoFreelistSize(lsm_db *pDb, int *pnFree, int *pnWaiting){
}

static int infoTreeSize(lsm_db *db, int *pnOldKB, int *pnNewKB){
  ShmHeader *pShm = db->pShmhdr;
  TreeHeader *p = &pShm->hdr1;

  /* The following code suffers from two race conditions, as it accesses and
  ** trusts the contents of shared memory without verifying checksums:
  **







<



















<
<
<







455
456
457
458
459
460
461

462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480



481
482
483
484
485
486
487
  return 0;
}

int lsmInfoFreelist(lsm_db *pDb, char **pzOut){
  Snapshot *pWorker;              /* Worker snapshot */
  int bUnlock = 0;
  LsmString s;

  int rc;

  /* Obtain the worker snapshot */
  rc = infoGetWorker(pDb, &pWorker, &bUnlock);
  if( rc!=LSM_OK ) return rc;

  lsmStringInit(&s, pDb->pEnv);
  rc = lsmWalkFreelist(pDb, 0, infoFreelistCb, &s);
  if( rc!=LSM_OK ){
    lsmFree(pDb->pEnv, s.z);
  }else{
    *pzOut = s.z;
  }

  /* Release the snapshot and return */
  infoFreeWorker(pDb, bUnlock);
  return rc;
}




static int infoTreeSize(lsm_db *db, int *pnOldKB, int *pnNewKB){
  ShmHeader *pShm = db->pShmhdr;
  TreeHeader *p = &pShm->hdr1;

  /* The following code suffers from two race conditions, as it accesses and
  ** trusts the contents of shared memory without verifying checksums:
  **
591
592
593
594
595
596
597










598
599
600
601
602
603
604

    case LSM_INFO_TREE_SIZE: {
      int *pnOld = va_arg(ap, int *);
      int *pnNew = va_arg(ap, int *);
      rc = infoTreeSize(pDb, pnOld, pnNew);
      break;
    }











    default:
      rc = LSM_MISUSE;
      break;
  }

  va_end(ap);







>
>
>
>
>
>
>
>
>
>







587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610

    case LSM_INFO_TREE_SIZE: {
      int *pnOld = va_arg(ap, int *);
      int *pnNew = va_arg(ap, int *);
      rc = infoTreeSize(pDb, pnOld, pnNew);
      break;
    }

    case LSM_INFO_COMPRESSION_ID: {
      unsigned int *piOut = va_arg(ap, unsigned int *);
      if( pDb->pClient ){
        *piOut = pDb->pClient->iCmpId;
      }else{
        rc = lsmInfoCompressionId(pDb, piOut);
      }
      break;
    }

    default:
      rc = LSM_MISUSE;
      break;
  }

  va_end(ap);
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
  assert_db_state( pDb );

  /* A value less than zero means close the innermost nested transaction. */
  if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);

  if( iLevel<pDb->nTransOpen ){
    if( iLevel==0 ){
      int bAutowork = 0;

      /* Commit the transaction to disk. */
      if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
      if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
        rc = lsmFsSyncLog(pDb->pFS);
      }
      lsmFinishWriteTrans(pDb, (rc==LSM_OK));
    }







<
<







868
869
870
871
872
873
874


875
876
877
878
879
880
881
  assert_db_state( pDb );

  /* A value less than zero means close the innermost nested transaction. */
  if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);

  if( iLevel<pDb->nTransOpen ){
    if( iLevel==0 ){


      /* Commit the transaction to disk. */
      if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
      if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
        rc = lsmFsSyncLog(pDb->pFS);
      }
      lsmFinishWriteTrans(pDb, (rc==LSM_OK));
    }
Changes to src/lsm_shared.c.
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927














928
929
930
931
932
933
934
935

936
937
938

939
940
941
942
943
944
945
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

  lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
}


/*
** Called when recovery is finished.
*/
int lsmFinishRecovery(lsm_db *pDb){
  lsmTreeEndTransaction(pDb, 1);
  return LSM_OK;
}















int lsmCheckCompressionId(lsm_db *pDb, u32 iReq){
  if( pDb->compress.iId!=iReq ){
    if( pDb->factory.xFactory ){
      pDb->bInFactory = 1;
      pDb->factory.xFactory(pDb->factory.pCtx, pDb, iReq);
      pDb->bInFactory = 0;
    }
    if( pDb->compress.iId!=iReq ){

      return LSM_MISMATCH;
    }
  }

  return LSM_OK;
}

/*
** Begin a read transaction. This function is a no-op if the connection
** passed as the only argument already has an open read transaction.
*/







<








>
>
>
>
>
>
>
>
>
>
>
>
>
>

|






>



>







912
913
914
915
916
917
918

919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

  lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
}


/*
** Called when recovery is finished.
*/
int lsmFinishRecovery(lsm_db *pDb){
  lsmTreeEndTransaction(pDb, 1);
  return LSM_OK;
}

/*
** Check if the currently configured compression functions
** (LSM_CONFIG_SET_COMPRESSION) are compatible with a database that has its
** compression id set to iReq. Compression routines are compatible if iReq
** is zero (indicating the database is empty), or if it is equal to the 
** compression id of the configured compression routines.
**
** If the check shows that the current compression are incompatible and there
** is a compression factory registered, give it a chance to install new
** compression routines.
**
** If, after any registered factory is invoked, the compression functions
** are still incompatible, return LSM_MISMATCH. Otherwise, LSM_OK.
*/
int lsmCheckCompressionId(lsm_db *pDb, u32 iReq){
  if( iReq!=LSM_COMPRESSION_EMPTY && pDb->compress.iId!=iReq ){
    if( pDb->factory.xFactory ){
      pDb->bInFactory = 1;
      pDb->factory.xFactory(pDb->factory.pCtx, pDb, iReq);
      pDb->bInFactory = 0;
    }
    if( pDb->compress.iId!=iReq ){
      /* Incompatible */
      return LSM_MISMATCH;
    }
  }
  /* Compatible */
  return LSM_OK;
}

/*
** Begin a read transaction. This function is a no-op if the connection
** passed as the only argument already has an open read transaction.
*/
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
            rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot,&pDb->pClient);
          }
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );
          assert( pDb->iReader>=0 );

          /* Check that the client has the right compression hooks loaded.
          ** If not, set rc to LSM_MISMATCH.  */
          assert( rc!=LSM_OK || pDb->pClient->iCmpId!=LSM_COMPRESSION_EMPTY );
          if( rc==LSM_OK && pDb->pClient->iCmpId!=pDb->compress.iId ){
            rc = lsmCheckCompressionId(pDb, pDb->pClient->iCmpId);
          }
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }








<
|







1006
1007
1008
1009
1010
1011
1012

1013
1014
1015
1016
1017
1018
1019
1020
            rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot,&pDb->pClient);
          }
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );
          assert( pDb->iReader>=0 );

          /* Check that the client has the right compression hooks loaded.
          ** If not, set rc to LSM_MISMATCH.  */

          if( rc==LSM_OK ){
            rc = lsmCheckCompressionId(pDb, pDb->pClient->iCmpId);
          }
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }

Changes to test/lsm4.test.
11
12
13
14
15
16
17






18
19
20
21
22
23
24
#

set testdir [file dirname $argv0]
source $testdir/tester.tcl
set testprefix lsm4
db close







proc db_fetch {db key} {
  db csr_open csr
  csr seek $key eq
  set ret [csr value]
  csr close
  set ret
}







>
>
>
>
>
>







11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#

set testdir [file dirname $argv0]
source $testdir/tester.tcl
set testprefix lsm4
db close

# Compression scheme ids (defined in test_lsm.c):
#
set compression_id(encrypt) 43
set compression_id(rle)     44
set compression_id(noop)    45

proc db_fetch {db key} {
  db csr_open csr
  csr seek $key eq
  set ret [csr value]
  csr close
  set ret
}
51
52
53
54
55
56
57


58

















































59
60
61
} {1 {error in lsm_csr_open() - 50}}

do_test 1.5 {
  db config {set_compression_factory true}
  list [db_fetch db 1] [db_fetch db 2]
} {abc def}






















































finish_test








>
>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
} {1 {error in lsm_csr_open() - 50}}

do_test 1.5 {
  db config {set_compression_factory true}
  list [db_fetch db 1] [db_fetch db 2]
} {abc def}

do_test 1.6 { db info compression_id } $compression_id(noop)
db close

#-------------------------------------------------------------------------
#
forcedelete test.db

do_test 2.1 {
  lsm_open db test.db
  db info compression_id
} {0}

do_test 2.2 {
  db write 1 abc
  db write 2 abc
  db info compression_id
} {0}

do_test 2.3 {
  lsm_open db2 test.db
  db2 info compression_id
} {0}

do_test 2.4 {
  db close
  db2 info compression_id
} {0}

do_test 2.5 {
  db2 close
  lsm_open db test.db
  db info compression_id
} {1}

db close
forcedelete test.db

do_test 2.6 {
  lsm_open db test.db
  db config {set_compression rle}
  db write 3 three
  db write 4 four
  db close

  lsm_open db test.db
  db info compression_id
} $compression_id(rle)

do_test 2.7 {
  db config {set_compression rle}
  list [db_fetch db 3] [db_fetch db 4]
} {three four}

finish_test

Changes to test/test_lsm.c.
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <string.h>

extern int getDbPointer(Tcl_Interp *interp, const char *zA, sqlite4 **ppDb);
extern const char *sqlite4TestErrorName(int);

/*************************************************************************
*/
#define ENCRYPTION_XOR_MASK 0xa3b2bbb6
static int testCompressEncBound(void *pCtx, int nSrc){
  return nSrc;
}
static int testCompressEncCompress(
  void *pCtx, 
  char *pOut, int *pnOut, 
  const char *pIn, int nIn







|







18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <string.h>

extern int getDbPointer(Tcl_Interp *interp, const char *zA, sqlite4 **ppDb);
extern const char *sqlite4TestErrorName(int);

/*************************************************************************
*/
#define ENCRYPTION_XOR_MASK 0x23b2bbb6
static int testCompressEncBound(void *pCtx, int nSrc){
  return nSrc;
}
static int testCompressEncCompress(
  void *pCtx, 
  char *pOut, int *pnOut, 
  const char *pIn, int nIn
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
  return LSM_OK;
}
static int testCompressEncUncompress(
  void *pCtx, 
  char *pOut, int *pnOut, 
  const char *pIn, int nIn
){
  return testCompressEncUncompress(pCtx, pOut, pnOut, pIn, nIn);
}
static void testCompressEncFree(void *pCtx){
  /* no-op */
}
/* 
** End of compression routines "encrypt".
*************************************************************************/







|







44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
  return LSM_OK;
}
static int testCompressEncUncompress(
  void *pCtx, 
  char *pOut, int *pnOut, 
  const char *pIn, int nIn
){
  return testCompressEncCompress(pCtx, pOut, pnOut, pIn, nIn);
}
static void testCompressEncFree(void *pCtx){
  /* no-op */
}
/* 
** End of compression routines "encrypt".
*************************************************************************/
535
536
537
538
539
540
541

542
543
544
545
546
547
548
        }
      }
    }
  }

  return rc;
}


typedef struct TclLsmCursor TclLsmCursor;
typedef struct TclLsm TclLsm;

struct TclLsm {
  lsm_db *db;
};







>







535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
        }
      }
    }
  }

  return rc;
}


typedef struct TclLsmCursor TclLsmCursor;
typedef struct TclLsm TclLsm;

struct TclLsm {
  lsm_db *db;
};
574
575
576
577
578
579
580
































581
582
583
584
585
586
587
static void test_lsm_del(void *ctx){
  TclLsm *p = (TclLsm *)ctx;
  if( p ){
    lsm_close(p->db);
    ckfree((char *)p);
  }
}

































/*
** Usage: CSR sub-command ...
*/
static int test_lsm_cursor_cmd(
  void * clientData,
  Tcl_Interp *interp,







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
static void test_lsm_del(void *ctx){
  TclLsm *p = (TclLsm *)ctx;
  if( p ){
    lsm_close(p->db);
    ckfree((char *)p);
  }
}

static int testInfoLsm(Tcl_Interp *interp, lsm_db *db, Tcl_Obj *pObj){
  struct Lsminfo {
    const char *zOpt;
    int eOpt;
  } aInfo[] = {
    { "compression_id",          LSM_INFO_COMPRESSION_ID },
    { 0, 0 }
  };
  int rc;
  int iOpt;

  rc = Tcl_GetIndexFromObjStruct(
      interp, pObj, aInfo, sizeof(aInfo[0]), "option", 0, &iOpt
  );
  if( rc==LSM_OK ){
    switch( aInfo[iOpt].eOpt ){
      case LSM_INFO_COMPRESSION_ID: {
        unsigned int iCmpId = 0;
        rc = lsm_info(db, LSM_INFO_COMPRESSION_ID, &iCmpId);
        if( rc==LSM_OK ){
          Tcl_SetObjResult(interp, Tcl_NewWideIntObj((Tcl_WideInt)iCmpId));
        }else{
          test_lsm_error(interp, "lsm_info", rc);
        }
        break;
      }
    }
  }

  return rc;
}

/*
** Usage: CSR sub-command ...
*/
static int test_lsm_cursor_cmd(
  void * clientData,
  Tcl_Interp *interp,
720
721
722
723
724
725
726

727
728
729
730
731
732
733
    /*  5 */ {"commit",       1, "LEVEL"},
    /*  6 */ {"rollback",     1, "LEVEL"},
    /*  7 */ {"csr_open",     1, "CSR"},
    /*  8 */ {"work",        -1, "?NMERGE? NPAGE"},
    /*  9 */ {"flush",        0, ""},
    /* 10 */ {"config",       1, "LIST"},
    /* 11 */ {"checkpoint",   0, ""},

    {0, 0, 0}
  };
  int iCmd;
  int rc;
  TclLsm *p = (TclLsm *)clientData;

  if( objc<2 ){







>







753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
    /*  5 */ {"commit",       1, "LEVEL"},
    /*  6 */ {"rollback",     1, "LEVEL"},
    /*  7 */ {"csr_open",     1, "CSR"},
    /*  8 */ {"work",        -1, "?NMERGE? NPAGE"},
    /*  9 */ {"flush",        0, ""},
    /* 10 */ {"config",       1, "LIST"},
    /* 11 */ {"checkpoint",   0, ""},
    /* 12 */ {"info",         1, "OPTION"},
    {0, 0, 0}
  };
  int iCmd;
  int rc;
  TclLsm *p = (TclLsm *)clientData;

  if( objc<2 ){
854
855
856
857
858
859
860




861
862
863
864
865
866
867
      return testConfigureLsm(interp, p->db, objv[2]);
    }

    case 11: assert( 0==strcmp(aCmd[11].zCmd, "checkpoint") ); {
      rc = lsm_checkpoint(p->db, 0);
      return test_lsm_error(interp, "lsm_checkpoint", rc);
    }





    default:
      assert( 0 );
  }

  Tcl_AppendResult(interp, "internal error", 0);
  return TCL_ERROR;







>
>
>
>







888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
      return testConfigureLsm(interp, p->db, objv[2]);
    }

    case 11: assert( 0==strcmp(aCmd[11].zCmd, "checkpoint") ); {
      rc = lsm_checkpoint(p->db, 0);
      return test_lsm_error(interp, "lsm_checkpoint", rc);
    }

    case 12: assert( 0==strcmp(aCmd[12].zCmd, "info") ); {
      return testInfoLsm(interp, p->db, objv[2]);
    }

    default:
      assert( 0 );
  }

  Tcl_AppendResult(interp, "internal error", 0);
  return TCL_ERROR;
Changes to www/lsmusr.wiki.
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
the next entry. After lsm_csr_next() is called to advance past the final
entry in the database, the cursor is left pointing to no entry at all,
lsm_csr_valid() returns 0, and the loop is finished. API function 
<a href=lsmapi.wiki#lsm_csr_key>lsm_csr_key()</a> is used to retrieve the
key associated with each database entry visited.

<verbatim>
  for(rc = lsm_csr_first(csr); lsm_csr_valid(csr); rc = lsm_csr_next(csr)){
    const void *pKey; int nKey;
    const void *pVal; int nVal;

    rc = lsm_csr_key(csr, &pKey, &nKey);
    if( rc==LSM_OK ) rc = lsm_csr_value(csr, &pVal, &nVal);
    if( rc!=LSM_OK ) break;

    /* At this point pKey points to the current key (size nKey bytes) and
    ** pVal points to the corresponding value (size nVal bytes).  */
  }
</verbatim>

<p> The example code above could be modified to iterate backwards through







|





|







370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
the next entry. After lsm_csr_next() is called to advance past the final
entry in the database, the cursor is left pointing to no entry at all,
lsm_csr_valid() returns 0, and the loop is finished. API function 
<a href=lsmapi.wiki#lsm_csr_key>lsm_csr_key()</a> is used to retrieve the
key associated with each database entry visited.

<verbatim>
  for(rc=lsm_csr_first(csr); rc==LSM_OK && lsm_csr_valid(csr); rc=lsm_csr_next(csr)){
    const void *pKey; int nKey;
    const void *pVal; int nVal;

    rc = lsm_csr_key(csr, &pKey, &nKey);
    if( rc==LSM_OK ) rc = lsm_csr_value(csr, &pVal, &nVal);
    if( rc==LSM_OK ) break;

    /* At this point pKey points to the current key (size nKey bytes) and
    ** pVal points to the corresponding value (size nVal bytes).  */
  }
</verbatim>

<p> The example code above could be modified to iterate backwards through
723
724
725
726
727
728
729
730

731
732
733
734
735
736
737
738
739
740
741
742










743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763

764
765
766









767





768







769
770

771
772
773
774
775
776
777
    int (*xBound)(void *pCtx, int nIn);
    int (*xCompress)(void *pCtx, void *pOut, int *pnOut, const void *pIn, int nIn);
    int (*xUncompress)(void *pCtx, void *pOut, int *pnOut, const void *pIn, int nIn);
    void (*xFree)(void *pCtx);
  };
</verbatim>

<p><i> Explain how the hooks work here (same as zipvfs) </i>


<p><i> Example code? Using zlib? Or something simple like an RLE
implementation?</i>

<p>The database file header of any LSM database contains a 32-bit unsigned
"compression id" field. If the database is not a compressed database, this
field is set to 1. Otherwise, it is set to an application supplied value
identifying the compression and/or encryption scheme in use. Application
compression scheme ids must be greater than or equal to 10000. Values smaller
than 10000 are reserved for internal use.

<p>The lsm_compression_id() API may be used to read the compression id from










a database connection. Because the compression id is stored in the database
header, it may be read before any required compression or encryption hooks
are configured.

<verbatim>
  #define LSM_COMPRESSION_EMPTY    0
  #define LSM_COMPRESSION_NONE     1
  int lsm_compression_id(lsm_db *db, u32 *piId);
</verbatim>

<p>When a database is opened for the first time, before it is first written,
the compression id field is set to LSM_COMPRESSION_EMPTY (0). The first time
a transaction is committed, the database compression id is set to a copy of 
the lsm_compress.iId field of the compression hooks for the database handle
committing the transaction, or to LSM_COMPRESSION_NONE (1) if no compression
hooks are configured.

<p>Once the compression id is set to something other than 
LSM_COMPRESSION_EMPTY, when a database handle opens a read or write 
transaction on the database, the compression id is compared against the 
lsm_compress.iId field of the configured compression hooks, or against LSM_COMPRESSION_NONE if no compression hooks are configured. If the compression id

does not match, then an LSM_MISMATCH error is returned and the operation 
fails (no transaction or database cursor is opened).










<p><i>Maybe there should be a way to register a mismatch-handler callback.





Otherwise, applications have to handle LSM_MISMATCH everywhere...







</i>



<h1 id=performance_tuning>6. Performance Tuning</h1>

<p> This section describes the various measures that can be taken in order to
fine-tune LSM in order to improve performance in specific circumstances.
Sub-section 6.1 contains a high-level overview of the 
<a href=#overview_of_lsm_architecture>system architecture</a>







|
>

|
|








|
>
>
>
>
>
>
>
>
>
>
|






<



|
|
|
|
|


|
|
|
>
|
|

>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
|
>
>
>
>
>
>
>
|

>







723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760

761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
    int (*xBound)(void *pCtx, int nIn);
    int (*xCompress)(void *pCtx, void *pOut, int *pnOut, const void *pIn, int nIn);
    int (*xUncompress)(void *pCtx, void *pOut, int *pnOut, const void *pIn, int nIn);
    void (*xFree)(void *pCtx);
  };
</verbatim>

<p><span style=color:red> Explain how the hooks work here (same as zipvfs)
</span>

<p><span style=color:red> Example code? Using zlib? Or something simple like an
RLE implementation?</span>

<p>The database file header of any LSM database contains a 32-bit unsigned
"compression id" field. If the database is not a compressed database, this
field is set to 1. Otherwise, it is set to an application supplied value
identifying the compression and/or encryption scheme in use. Application
compression scheme ids must be greater than or equal to 10000. Values smaller
than 10000 are reserved for internal use.

<p>The lsm_info() API may be used to read the compression id from a database 
connection as follows: 

<verbatim>
  unsigned int iCompressionId;
  rc = lsm_info(db, LSM_INFO_COMPRESSION_ID, &iCompressionId);
  if( rc==LSM_OK ){
    /* Variable iCompressionId now contains the db compression id */
  }
</verbatim>

Because the compression id is stored in the database
header, it may be read before any required compression or encryption hooks
are configured.

<verbatim>
  #define LSM_COMPRESSION_EMPTY    0
  #define LSM_COMPRESSION_NONE     1

</verbatim>

<p>When a database is opened for the first time, before it is first written,
the compression id field is set to LSM_COMPRESSION_EMPTY (0). After data is
written into the database file, the database compression id is set to a copy 
of the lsm_compress.iId field of the compression hooks for the database handle
doing the writing, or to LSM_COMPRESSION_NONE (1) if no compression hooks 
are configured.

<p>Once the compression id is set to something other than 
LSM_COMPRESSION_EMPTY, when a database handle attempts to read or write the
database file, the compression id is compared against the lsm_compress.iId 
field of the configured compression hooks, or against LSM_COMPRESSION_NONE if
no compression hooks are configured. If the compression id does not match, then
an LSM_MISMATCH error is returned and the operation fails (no transaction or
database cursor is opened).

<p>It is also possible to register a compression factory callback with a 
database handle. If one is registered, the compression factory callback is
invoked instead of returning LSM_MISMATCH if the configured compression hooks
do not match the compression id of a database. If the callback registers
compatible compression hooks with the database handle (using the normal
lsm_config() interface), then the database read or write operation resumes
after it returns. Otherwise, if the compression factory callback does not
register new, compatible, compression hooks with the database handle,
LSM_MISMATCH is returned to the user.

<p>A compression factory callback is registered with a database handle
by calling lsm_config() with the second argument set to
LSM_CONFIG_SET_COMPRESSION_FACTORY, and the third argument set to point to
an instance of structure lsm_compress_factory. The lsm_config() copies the
contents of the structure - it does not retain a pointer to it.

<verbatim>
  typedef struct lsm_compress_factory lsm_compress_factory;
  struct lsm_compress_factory {
    void *pCtx;
    int (*xFactory)(void *pCtx, lsm_db *db, unsigned int iCompressionId);
    void (*xFree)(void *pCtx);
  };
</verbatim>

<p><span style=color:red> Explain how the xFactory hook works here. </span>

<h1 id=performance_tuning>6. Performance Tuning</h1>

<p> This section describes the various measures that can be taken in order to
fine-tune LSM in order to improve performance in specific circumstances.
Sub-section 6.1 contains a high-level overview of the 
<a href=#overview_of_lsm_architecture>system architecture</a>