Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Changes In Branch range-delete Excluding Merge-Ins
This is equivalent to a diff from 45e59053e7 to 80abdbea2d
2012-10-15
| ||
19:36 | Merge range-delete branch back into trunk. check-in: a7de625f13 user: dan tags: trunk | |
19:34 | Fix a case in live-recovery from a writer crash. Leaf check-in: 80abdbea2d user: dan tags: range-delete | |
16:42 | Fix an assert() that can fail following an OOM condition. check-in: 8a713f17fc user: dan tags: range-delete | |
2012-10-03
| ||
20:05 | Add (partially working) code for deleting keys to lsm_tree.c. Required for range-deletes. check-in: daa6942834 user: dan tags: range-delete | |
09:24 | Minor changes to the lsmperf.tcl script. check-in: 45e59053e7 user: dan tags: trunk | |
2012-10-02
| ||
18:06 | Simplify the way new cursors are created. check-in: 63d8eea506 user: dan tags: trunk | |
Changes to lsm-test/lsmtest.h.
︙ | ︙ | |||
42 43 44 45 46 47 48 49 50 51 52 53 54 55 | DatabaseMethods const *pMethods; /* Database methods */ const char *zLibrary; /* Library name for tdb_open() */ }; struct DatabaseMethods { int (*xClose)(TestDb *); int (*xWrite)(TestDb *, void *, int , void *, int); int (*xDelete)(TestDb *, void *, int); int (*xFetch)(TestDb *, void *, int, void **, int *); int (*xScan)(TestDb *, void *, int, void *, int, void *, int, void (*)(void *, void *, int , void *, int) ); int (*xBegin)(TestDb *, int); int (*xCommit)(TestDb *, int); int (*xRollback)(TestDb *, int); | > | 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | DatabaseMethods const *pMethods; /* Database methods */ const char *zLibrary; /* Library name for tdb_open() */ }; struct DatabaseMethods { int (*xClose)(TestDb *); int (*xWrite)(TestDb *, void *, int , void *, int); int (*xDelete)(TestDb *, void *, int); int (*xDeleteRange)(TestDb *, void *, int, void *, int); int (*xFetch)(TestDb *, void *, int, void **, int *); int (*xScan)(TestDb *, void *, int, void *, int, void *, int, void (*)(void *, void *, int , void *, int) ); int (*xBegin)(TestDb *, int); int (*xCommit)(TestDb *, int); int (*xRollback)(TestDb *, int); |
︙ | ︙ | |||
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | void test_failed(void); char *testMallocPrintf(const char *zFormat, ...); char *testMallocVPrintf(const char *zFormat, va_list ap); int testGlobMatch(const char *zPattern, const char *zStr); void testScanCompare(TestDb *, TestDb *, int, void *, int, void *, int, int *); void *testMalloc(int); void *testRealloc(void *, int); void testFree(void *); /* testio.c */ int testVfsConfigureDb(TestDb *pDb); /* testfunc.c */ | > > | 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | void test_failed(void); char *testMallocPrintf(const char *zFormat, ...); char *testMallocVPrintf(const char *zFormat, va_list ap); int testGlobMatch(const char *zPattern, const char *zStr); void testScanCompare(TestDb *, TestDb *, int, void *, int, void *, int, int *); void testFetchCompare(TestDb *, TestDb *, void *, int, int *); void *testMalloc(int); void *testMallocCopy(void *pCopy, int nByte); void *testRealloc(void *, int); void testFree(void *); /* testio.c */ int testVfsConfigureDb(TestDb *pDb); /* testfunc.c */ |
︙ | ︙ | |||
190 191 192 193 194 195 196 | void testWriteDatasource(TestDb *, Datasource *, int, int *); void testWriteDatasourceRange(TestDb *, Datasource *, int, int, int *); void testDeleteDatasource(TestDb *, Datasource *, int, int *); void testDeleteDatasourceRange(TestDb *, Datasource *, int, int, int *); /* test1.c */ | | < | | 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | void testWriteDatasource(TestDb *, Datasource *, int, int *); void testWriteDatasourceRange(TestDb *, Datasource *, int, int, int *); void testDeleteDatasource(TestDb *, Datasource *, int, int *); void testDeleteDatasourceRange(TestDb *, Datasource *, int, int, int *); /* test1.c */ void test_data_1(const char *, const char *, int *pRc); void test_data_2(const char *, const char *, int *pRc); void testDbContents(TestDb *, Datasource *, int, int, int, int, int, int *); void testCaseProgress(int, int, int, int *); int testCaseNDot(void); typedef struct CksumDb CksumDb; CksumDb *testCksumArrayNew(Datasource *, int, int, int); char *testCksumArrayGet(CksumDb *, int); |
︙ | ︙ |
Changes to lsm-test/lsmtest1.c.
1 2 3 4 5 6 | #include "lsmtest.h" #define DATA_SEQUENTIAL TEST_DATASOURCE_SEQUENCE #define DATA_RANDOM TEST_DATASOURCE_RANDOM | | | < < < < < | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | #include "lsmtest.h" #define DATA_SEQUENTIAL TEST_DATASOURCE_SEQUENCE #define DATA_RANDOM TEST_DATASOURCE_RANDOM typedef struct Datatest1 Datatest1; typedef struct Datatest2 Datatest2; /* ** An instance of the following structure contains parameters used to ** customize the test function in this file. Test procedure: ** ** 1. Create a data-source based on the "datasource definition" vars. ** ** 2. Insert nRow key value pairs into the database. ** ** 3. Delete all keys from the database. Deletes are done in the same ** order as the inserts. ** ** During steps 2 and 3 above, after each Datatest1.nVerify inserts or ** deletes, the following: ** ** a. Run Datasource.nTest key lookups and check the results are as expected. ** ** b. If Datasource.bTestScan is true, run a handful (8) of range ** queries (scanning forwards and backwards). Check that the results ** are as expected. ** ** c. Close and reopen the database. Then run (a) and (b) again. */ struct Datatest1 { /* Datasource definition */ DatasourceDefn defn; /* Test procedure parameters */ int nRow; /* Number of rows to insert then delete */ int nVerify; /* How often to verify the db contents */ int nTest; /* Number of keys to test (0==all) */ int bTestScan; /* True to do scan tests */ }; /* ** An instance of the following data structure is used to describe the ** second type of test case in this file. The chief difference between ** these tests and those described by Datatest1 is that these tests also ** experiment with range-delete operations. Tests proceed as follows: ** ** 1. Open the datasource described by Datatest2.defn. ** ** 2. Open a connection on an empty database. ** ** 3. Do this Datatest2.nIter times: ** ** a) Insert Datatest2.nWrite key-value pairs from the datasource. ** ** b) Select two pseudo-random keys and use them as the start ** and end points of a range-delete operation. ** ** c) Verify that the contents of the database are as expected (see ** below for details). ** ** d) Close and then reopen the database handle. ** ** e) Verify that the contents of the database are still as expected. ** ** The inserts and range deletes are run twice - once on the database being ** tested and once using a control system (sqlite3, kc etc. - something that ** works). In order to verify that the contents of the db being tested are ** correct, the test runs a bunch of scans and lookups on both the test and ** control databases. If the results are the same, the test passes. */ struct Datatest2 { DatasourceDefn defn; int nRange; int nWrite; /* Number of writes per iteration */ int nIter; /* Total number of iterations to run */ }; /* ** Generate a unique name for the test case pTest with database system ** zSystem. */ static char *getName(const char *zSystem, Datatest1 *pTest){ char *zRet; char *zData; zData = testDatasourceName(&pTest->defn); zRet = testMallocPrintf("data.%s.%s.%d.%d", zSystem, zData, pTest->nRow, pTest->nVerify ); testFree(zData); |
︙ | ︙ | |||
211 212 213 214 215 216 217 | void *pCtx, void *pKey, int nKey, void *pVal, int nVal ){ printf("%s\n", (char *)pKey); fflush(stdout); } #endif | | | | 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 | void *pCtx, void *pKey, int nKey, void *pVal, int nVal ){ printf("%s\n", (char *)pKey); fflush(stdout); } #endif static void doDataTest1( const char *zSystem, /* Database system to test */ Datatest1 *p, /* Structure containing test parameters */ int *pRc /* OUT: Error code */ ){ int i; int iDot; int rc = LSM_OK; Datasource *pData; TestDb *pDb; |
︙ | ︙ | |||
276 277 278 279 280 281 282 | testDatasourceFree(pData); tdb_close(pDb); testCaseFinish(rc); *pRc = rc; } | | | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 | testDatasourceFree(pData); tdb_close(pDb); testCaseFinish(rc); *pRc = rc; } void test_data_1( const char *zSystem, /* Database system name */ const char *zPattern, /* Run test cases that match this pattern */ int *pRc /* IN/OUT: Error code */ ){ Datatest1 aTest[] = { { {DATA_RANDOM, 20,25, 100,200}, 1000, 250, 1000, 1}, { {DATA_RANDOM, 8,10, 100,200}, 1000, 250, 1000, 1}, { {DATA_RANDOM, 8,10, 10,20}, 1000, 250, 1000, 1}, { {DATA_RANDOM, 8,10, 1000,2000}, 1000, 250, 1000, 1}, { {DATA_RANDOM, 8,100, 10000,20000}, 100, 25, 100, 1}, { {DATA_RANDOM, 80,100, 10,20}, 1000, 250, 1000, 1}, { {DATA_RANDOM, 5000,6000, 10,20}, 100, 25, 100, 1}, { {DATA_SEQUENTIAL, 5,10, 10,20}, 1000, 250, 1000, 1}, { {DATA_SEQUENTIAL, 5,10, 100,200}, 1000, 250, 1000, 1}, { {DATA_SEQUENTIAL, 5,10, 1000,2000}, 1000, 250, 1000, 1}, { {DATA_SEQUENTIAL, 5,100, 10000,20000}, 100, 25, 100, 1}, { {DATA_RANDOM, 10,10, 100,100}, 100000, 1000, 100, 0}, { {DATA_SEQUENTIAL, 10,10, 100,100}, 100000, 1000, 100, 0}, }; int i; for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){ char *zName = getName(zSystem, &aTest[i]); if( testCaseBegin(pRc, zPattern, "%s", zName) ){ doDataTest1(zSystem, &aTest[i], pRc); } testFree(zName); } } static void testCompareDb( Datasource *pData, int nData, int iSeed, TestDb *pControl, TestDb *pDb, int *pRc ){ int i; testScanCompare(pControl, pDb, 0, 0, 0, 0, 0, pRc); testScanCompare(pControl, pDb, 1, 0, 0, 0, 0, pRc); if( *pRc==0 ){ int iKey1; int iKey2; void *pKey1; int nKey1; /* Start key */ void *pKey2; int nKey2; /* Final key */ iKey1 = testPrngValue(iSeed) % nData; iKey2 = testPrngValue(iSeed+1) % nData; testDatasourceEntry(pData, iKey1, &pKey2, &nKey1, 0, 0); pKey1 = testMalloc(nKey1+1); memcpy(pKey1, pKey2, nKey1+1); testDatasourceEntry(pData, iKey2, &pKey2, &nKey2, 0, 0); testScanCompare(pControl, pDb, 0, 0, 0, pKey2, nKey2, pRc); testScanCompare(pControl, pDb, 0, pKey1, nKey1, 0, 0, pRc); testScanCompare(pControl, pDb, 0, pKey1, nKey1, pKey2, nKey2, pRc); testScanCompare(pControl, pDb, 1, 0, 0, pKey2, nKey2, pRc); testScanCompare(pControl, pDb, 1, pKey1, nKey1, 0, 0, pRc); testScanCompare(pControl, pDb, 1, pKey1, nKey1, pKey2, nKey2, pRc); testFree(pKey1); } for(i=0; i<nData && *pRc==0; i++){ void *pKey; int nKey; testDatasourceEntry(pData, i, &pKey, &nKey, 0, 0); testFetchCompare(pControl, pDb, pKey, nKey, pRc); } } static void doDataTest2( const char *zSystem, /* Database system to test */ Datatest2 *p, /* Structure containing test parameters */ int *pRc /* OUT: Error code */ ){ TestDb *pDb; TestDb *pControl; Datasource *pData; int i; int rc = LSM_OK; int iDot = 0; /* Start the test case, open a database and allocate the datasource. */ pDb = testOpen(zSystem, 1, &rc); pData = testDatasourceNew(&p->defn); rc = testControlDb(&pControl); if( tdb_lsm(pDb) ){ int nBuf = 32 * 1024 * 1024; lsm_config(tdb_lsm(pDb), LSM_CONFIG_WRITE_BUFFER, &nBuf); } for(i=0; rc==0 && i<p->nIter; i++){ void *pKey1; int nKey1; void *pKey2; int nKey2; int ii; int nRange = MIN(p->nIter*p->nWrite, p->nRange); for(ii=0; rc==0 && ii<p->nWrite; ii++){ int iKey = (i*p->nWrite + ii) % p->nRange; testWriteDatasource(pControl, pData, iKey, &rc); testWriteDatasource(pDb, pData, iKey, &rc); } testDatasourceEntry(pData, i+1000000, &pKey1, &nKey1, 0, 0); pKey1 = testMallocCopy(pKey1, nKey1); testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0); testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc); testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc); testFree(pKey1); testCompareDb(pData, nRange, i, pControl, pDb, &rc); testReopen(&pDb, &rc); testCompareDb(pData, nRange, i, pControl, pDb, &rc); /* Update the progress dots... */ testCaseProgress(i, p->nIter, testCaseNDot(), &iDot); } testClose(&pDb); testClose(&pControl); testDatasourceFree(pData); testCaseFinish(rc); *pRc = rc; } static char *getName2(const char *zSystem, Datatest2 *pTest){ char *zRet; char *zData; zData = testDatasourceName(&pTest->defn); zRet = testMallocPrintf("data2.%s.%s.%d.%d.%d", zSystem, zData, pTest->nRange, pTest->nWrite, pTest->nIter ); testFree(zData); return zRet; } void test_data_2( const char *zSystem, /* Database system name */ const char *zPattern, /* Run test cases that match this pattern */ int *pRc /* IN/OUT: Error code */ ){ Datatest2 aTest[] = { /* defn, nRange, nWrite, nIter */ { {DATA_RANDOM, 20,25, 100,200}, 10000, 10, 50 }, { {DATA_RANDOM, 20,25, 100,200}, 10000, 200, 50 }, { {DATA_RANDOM, 20,25, 100,200}, 100, 10, 1000 }, { {DATA_RANDOM, 20,25, 100,200}, 100, 200, 50 }, #if 0 { {DATA_RANDOM, 20,25, 100,200}, 200, 50 } #endif }; int i; for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){ char *zName = getName2(zSystem, &aTest[i]); if( testCaseBegin(pRc, zPattern, "%s", zName) ){ doDataTest2(zSystem, &aTest[i], pRc); } testFree(zName); } } |
Changes to lsm-test/lsmtest8.c.
︙ | ︙ | |||
301 302 303 304 305 306 307 | } void do_writer_crash_test(const char *zPattern, int *pRc){ struct Test { const char *zName; void (*xFunc)(int *); } aTest[] = { | < > | 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 | } void do_writer_crash_test(const char *zPattern, int *pRc){ struct Test { const char *zName; void (*xFunc)(int *); } aTest[] = { { "writercrash1.lsm", doWriterCrash1 }, { "writercrash2.lsm", doWriterCrash2 }, }; int i; for(i=0; i<ArraySize(aTest); i++){ struct Test *p = &aTest[i]; if( testCaseBegin(pRc, zPattern, p->zName) ){ p->xFunc(pRc); testCaseFinish(*pRc); |
︙ | ︙ |
Changes to lsm-test/lsmtest_main.c.
︙ | ︙ | |||
69 70 71 72 73 74 75 | void testDelete( TestDb *pDb, /* Database handle */ void *pKey, int nKey, /* Key to query database for */ int *pRc /* IN/OUT: Error code */ ){ if( *pRc==0 ){ int rc; | | > > > > > > > > > > > > | 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | void testDelete( TestDb *pDb, /* Database handle */ void *pKey, int nKey, /* Key to query database for */ int *pRc /* IN/OUT: Error code */ ){ if( *pRc==0 ){ int rc; *pRc = rc = tdb_delete(pDb, pKey, nKey); testSetError(rc); } } void testDeleteRange( TestDb *pDb, /* Database handle */ void *pKey1, int nKey1, void *pKey2, int nKey2, int *pRc /* IN/OUT: Error code */ ){ if( *pRc==0 ){ int rc; *pRc = rc = tdb_delete_range(pDb, pKey1, nKey1, pKey2, nKey2); testSetError(rc); } } void testBegin(TestDb *pDb, int iTrans, int *pRc){ if( *pRc==0 ){ int rc; |
︙ | ︙ | |||
120 121 122 123 124 125 126 | const char *zVal, /* Value to write */ int *pRc /* IN/OUT: Error code */ ){ int nVal = (zVal ? strlen(zVal) : 0); testFetch(pDb, (void *)zKey, strlen(zKey), (void *)zVal, nVal, pRc); } | | > > > | 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | const char *zVal, /* Value to write */ int *pRc /* IN/OUT: Error code */ ){ int nVal = (zVal ? strlen(zVal) : 0); testFetch(pDb, (void *)zKey, strlen(zKey), (void *)zVal, nVal, pRc); } void testFetchCompare( TestDb *pDb1, TestDb *pDb2, void *pKey, int nKey, int *pRc ){ int rc; void *pDbVal1; void *pDbVal2; int nDbVal1; int nDbVal2; static int nCall = 0; nCall++; rc = tdb_fetch(pDb1, pKey, nKey, &pDbVal1, &nDbVal1); testSetError(rc); rc = tdb_fetch(pDb2, pKey, nKey, &pDbVal2, &nDbVal2); testSetError(rc); |
︙ | ︙ | |||
181 182 183 184 185 186 187 | void *pVal, int nVal ){ ScanResult *p = (ScanResult *)pCtx; u8 *aKey = (u8 *)pKey; u8 *aVal = (u8 *)pVal; int i; | | > > > | 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 | void *pVal, int nVal ){ ScanResult *p = (ScanResult *)pCtx; u8 *aKey = (u8 *)pKey; u8 *aVal = (u8 *)pVal; int i; if( test_scan_debug ){ printf("%d: %.*s\n", p->nRow, nKey, (char *)pKey); fflush(stdout); } #if 0 if( test_scan_debug ) printf("%.20s\n", (char *)pVal); #endif #if 0 /* Check tdb_fetch() matches */ int rc = 0; |
︙ | ︙ | |||
369 370 371 372 373 374 375 376 377 378 379 380 381 382 | ** Allocations made using testMalloc() should be freed using testFree(). */ void *testMalloc(int n){ void *pRet = malloc(n); memset(pRet, 0, n); return pRet; } void *testRealloc(void *p, int n){ return realloc(p, n); } /* ** Free an allocation made by an earlier call to testMalloc(). | > > > > > > | 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 | ** Allocations made using testMalloc() should be freed using testFree(). */ void *testMalloc(int n){ void *pRet = malloc(n); memset(pRet, 0, n); return pRet; } void *testMallocCopy(void *pCopy, int nByte){ void *pRet = testMalloc(nByte); memcpy(pRet, pCopy, nByte); return pRet; } void *testRealloc(void *p, int n){ return realloc(p, n); } /* ** Free an allocation made by an earlier call to testMalloc(). |
︙ | ︙ | |||
432 433 434 435 436 437 438 | if( nArg==1 ){ zPattern = azArg[0]; } for(j=0; tdb_system_name(j); j++){ rc = 0; | | > | 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 | if( nArg==1 ){ zPattern = azArg[0]; } for(j=0; tdb_system_name(j); j++){ rc = 0; test_data_1(tdb_system_name(j), zPattern, &rc); test_data_2(tdb_system_name(j), zPattern, &rc); test_rollback(tdb_system_name(j), zPattern, &rc); test_mc(tdb_system_name(j), zPattern, &rc); test_mt(tdb_system_name(j), zPattern, &rc); if( rc ) nFail++; } |
︙ | ︙ |
Changes to lsm-test/lsmtest_tdb.c.
︙ | ︙ | |||
177 178 179 180 181 182 183 184 185 186 187 188 189 190 | } static int test_leveldb_open(const char *zFilename, int bClear, TestDb **ppDb){ static const DatabaseMethods LeveldbMethods = { test_leveldb_close, test_leveldb_write, test_leveldb_delete, test_leveldb_fetch, test_leveldb_scan, error_transaction_function, error_transaction_function, error_transaction_function }; | > | 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | } static int test_leveldb_open(const char *zFilename, int bClear, TestDb **ppDb){ static const DatabaseMethods LeveldbMethods = { test_leveldb_close, test_leveldb_write, test_leveldb_delete, 0, test_leveldb_fetch, test_leveldb_scan, error_transaction_function, error_transaction_function, error_transaction_function }; |
︙ | ︙ | |||
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 | ** (nOpenTrans-1) nested write transactions open. */ struct SqlDb { TestDb base; sqlite3 *db; sqlite3_stmt *pInsert; sqlite3_stmt *pDelete; sqlite3_stmt *pFetch; sqlite3_stmt *apScan[8]; int nOpenTrans; /* Used by sql_fetch() to allocate space for results */ int nAlloc; u8 *aAlloc; }; static int sql_close(TestDb *pTestDb){ SqlDb *pDb = (SqlDb *)pTestDb; sqlite3_finalize(pDb->pInsert); sqlite3_finalize(pDb->pDelete); sqlite3_finalize(pDb->pFetch); sqlite3_finalize(pDb->apScan[0]); sqlite3_finalize(pDb->apScan[1]); sqlite3_finalize(pDb->apScan[2]); sqlite3_finalize(pDb->apScan[3]); sqlite3_finalize(pDb->apScan[4]); sqlite3_finalize(pDb->apScan[5]); | > > | 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 | ** (nOpenTrans-1) nested write transactions open. */ struct SqlDb { TestDb base; sqlite3 *db; sqlite3_stmt *pInsert; sqlite3_stmt *pDelete; sqlite3_stmt *pDeleteRange; sqlite3_stmt *pFetch; sqlite3_stmt *apScan[8]; int nOpenTrans; /* Used by sql_fetch() to allocate space for results */ int nAlloc; u8 *aAlloc; }; static int sql_close(TestDb *pTestDb){ SqlDb *pDb = (SqlDb *)pTestDb; sqlite3_finalize(pDb->pInsert); sqlite3_finalize(pDb->pDelete); sqlite3_finalize(pDb->pDeleteRange); sqlite3_finalize(pDb->pFetch); sqlite3_finalize(pDb->apScan[0]); sqlite3_finalize(pDb->apScan[1]); sqlite3_finalize(pDb->apScan[2]); sqlite3_finalize(pDb->apScan[3]); sqlite3_finalize(pDb->apScan[4]); sqlite3_finalize(pDb->apScan[5]); |
︙ | ︙ | |||
356 357 358 359 360 361 362 363 364 365 366 367 368 369 | static int sql_delete(TestDb *pTestDb, void *pKey, int nKey){ SqlDb *pDb = (SqlDb *)pTestDb; sqlite3_bind_blob(pDb->pDelete, 1, pKey, nKey, SQLITE_STATIC); sqlite3_step(pDb->pDelete); return sqlite3_reset(pDb->pDelete); } static int sql_fetch( TestDb *pTestDb, void *pKey, int nKey, void **ppVal, int *pnVal | > > > > > > > > > > > > | 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 | static int sql_delete(TestDb *pTestDb, void *pKey, int nKey){ SqlDb *pDb = (SqlDb *)pTestDb; sqlite3_bind_blob(pDb->pDelete, 1, pKey, nKey, SQLITE_STATIC); sqlite3_step(pDb->pDelete); return sqlite3_reset(pDb->pDelete); } static int sql_delete_range( TestDb *pTestDb, void *pKey1, int nKey1, void *pKey2, int nKey2 ){ SqlDb *pDb = (SqlDb *)pTestDb; sqlite3_bind_blob(pDb->pDeleteRange, 1, pKey1, nKey1, SQLITE_STATIC); sqlite3_bind_blob(pDb->pDeleteRange, 2, pKey2, nKey2, SQLITE_STATIC); sqlite3_step(pDb->pDeleteRange); return sqlite3_reset(pDb->pDeleteRange); } static int sql_fetch( TestDb *pTestDb, void *pKey, int nKey, void **ppVal, int *pnVal |
︙ | ︙ | |||
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 | } static int sql_open(const char *zFilename, int bClear, TestDb **ppDb){ static const DatabaseMethods SqlMethods = { sql_close, sql_write, sql_delete, sql_fetch, sql_scan, sql_begin, sql_commit, sql_rollback }; const char *zCreate = "CREATE TABLE IF NOT EXISTS t1(k PRIMARY KEY, v)"; const char *zInsert = "REPLACE INTO t1 VALUES(?, ?)"; const char *zDelete = "DELETE FROM t1 WHERE k = ?"; const char *zFetch = "SELECT v FROM t1 WHERE k = ?"; const char *zScan0 = "SELECT * FROM t1 WHERE k BETWEEN ?1 AND ?2 ORDER BY k"; const char *zScan1 = "SELECT * FROM t1 WHERE k <= ?2 ORDER BY k"; const char *zScan2 = "SELECT * FROM t1 WHERE k >= ?1 ORDER BY k"; const char *zScan3 = "SELECT * FROM t1 ORDER BY k"; | > > | 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 | } static int sql_open(const char *zFilename, int bClear, TestDb **ppDb){ static const DatabaseMethods SqlMethods = { sql_close, sql_write, sql_delete, sql_delete_range, sql_fetch, sql_scan, sql_begin, sql_commit, sql_rollback }; const char *zCreate = "CREATE TABLE IF NOT EXISTS t1(k PRIMARY KEY, v)"; const char *zInsert = "REPLACE INTO t1 VALUES(?, ?)"; const char *zDelete = "DELETE FROM t1 WHERE k = ?"; const char *zRange = "DELETE FROM t1 WHERE k>? AND k<?"; const char *zFetch = "SELECT v FROM t1 WHERE k = ?"; const char *zScan0 = "SELECT * FROM t1 WHERE k BETWEEN ?1 AND ?2 ORDER BY k"; const char *zScan1 = "SELECT * FROM t1 WHERE k <= ?2 ORDER BY k"; const char *zScan2 = "SELECT * FROM t1 WHERE k >= ?1 ORDER BY k"; const char *zScan3 = "SELECT * FROM t1 ORDER BY k"; |
︙ | ︙ | |||
546 547 548 549 550 551 552 553 554 555 556 557 558 559 | memset(pDb, 0, sizeof(SqlDb)); pDb->base.pMethods = &SqlMethods; if( 0!=(rc = sqlite3_open(zFilename, &pDb->db)) || 0!=(rc = sqlite3_exec(pDb->db, zCreate, 0, 0, 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zInsert, -1, &pDb->pInsert, 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zDelete, -1, &pDb->pDelete, 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zFetch, -1, &pDb->pFetch, 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan0, -1, &pDb->apScan[0], 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan1, -1, &pDb->apScan[1], 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan2, -1, &pDb->apScan[2], 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan3, -1, &pDb->apScan[3], 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan4, -1, &pDb->apScan[4], 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan5, -1, &pDb->apScan[5], 0)) | > | 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 | memset(pDb, 0, sizeof(SqlDb)); pDb->base.pMethods = &SqlMethods; if( 0!=(rc = sqlite3_open(zFilename, &pDb->db)) || 0!=(rc = sqlite3_exec(pDb->db, zCreate, 0, 0, 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zInsert, -1, &pDb->pInsert, 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zDelete, -1, &pDb->pDelete, 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zRange, -1, &pDb->pDeleteRange, 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zFetch, -1, &pDb->pFetch, 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan0, -1, &pDb->apScan[0], 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan1, -1, &pDb->apScan[1], 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan2, -1, &pDb->apScan[2], 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan3, -1, &pDb->apScan[3], 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan4, -1, &pDb->apScan[4], 0)) || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan5, -1, &pDb->apScan[5], 0)) |
︙ | ︙ | |||
644 645 646 647 648 649 650 651 652 653 654 655 656 657 | int tdb_write(TestDb *pDb, void *pKey, int nKey, void *pVal, int nVal){ return pDb->pMethods->xWrite(pDb, pKey, nKey, pVal, nVal); } int tdb_delete(TestDb *pDb, void *pKey, int nKey){ return pDb->pMethods->xDelete(pDb, pKey, nKey); } int tdb_fetch(TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal){ return pDb->pMethods->xFetch(pDb, pKey, nKey, ppVal, pnVal); } int tdb_scan( TestDb *pDb, /* Database handle */ | > > > > > > | 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 | int tdb_write(TestDb *pDb, void *pKey, int nKey, void *pVal, int nVal){ return pDb->pMethods->xWrite(pDb, pKey, nKey, pVal, nVal); } int tdb_delete(TestDb *pDb, void *pKey, int nKey){ return pDb->pMethods->xDelete(pDb, pKey, nKey); } int tdb_delete_range( TestDb *pDb, void *pKey1, int nKey1, void *pKey2, int nKey2 ){ return pDb->pMethods->xDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2); } int tdb_fetch(TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal){ return pDb->pMethods->xFetch(pDb, pKey, nKey, ppVal, pnVal); } int tdb_scan( TestDb *pDb, /* Database handle */ |
︙ | ︙ |
Changes to lsm-test/lsmtest_tdb.h.
︙ | ︙ | |||
46 47 48 49 50 51 52 53 54 55 56 57 58 59 | int tdb_write(TestDb *pDb, void *pKey, int nKey, void *pVal, int nVal); /* ** Delete a key from the database. */ int tdb_delete(TestDb *pDb, void *pKey, int nKey); /* ** Query the database for key (pKey/nKey). If no entry is found, set *ppVal ** to 0 and *pnVal to -1 before returning. Otherwise, set *ppVal and *pnVal ** to a pointer to and size of the value associated with (pKey/nKey). */ int tdb_fetch(TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal); | > > > > > | 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | int tdb_write(TestDb *pDb, void *pKey, int nKey, void *pVal, int nVal); /* ** Delete a key from the database. */ int tdb_delete(TestDb *pDb, void *pKey, int nKey); /* ** Delete a range of keys from the database. */ int tdb_delete_range(TestDb *, void *pKey1, int nKey1, void *pKey2, int nKey2); /* ** Query the database for key (pKey/nKey). If no entry is found, set *ppVal ** to 0 and *pnVal to -1 before returning. Otherwise, set *ppVal and *pnVal ** to a pointer to and size of the value associated with (pKey/nKey). */ int tdb_fetch(TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal); |
︙ | ︙ |
Changes to lsm-test/lsmtest_tdb3.c.
︙ | ︙ | |||
450 451 452 453 454 455 456 457 458 459 460 461 462 463 | return lsm_write(pDb->db, pKey, nKey, pVal, nVal); } static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){ LsmDb *pDb = (LsmDb *)pTestDb; return lsm_delete(pDb->db, pKey, nKey); } static int test_lsm_fetch( TestDb *pTestDb, void *pKey, int nKey, void **ppVal, int *pnVal | > > > > > > > > > | 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 | return lsm_write(pDb->db, pKey, nKey, pVal, nVal); } static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){ LsmDb *pDb = (LsmDb *)pTestDb; return lsm_delete(pDb->db, pKey, nKey); } static int test_lsm_delete_range( TestDb *pTestDb, void *pKey1, int nKey1, void *pKey2, int nKey2 ){ LsmDb *pDb = (LsmDb *)pTestDb; return lsm_delete_range(pDb->db, pKey1, nKey1, pKey2, nKey2); } static int test_lsm_fetch( TestDb *pTestDb, void *pKey, int nKey, void **ppVal, int *pnVal |
︙ | ︙ | |||
726 727 728 729 730 731 732 733 734 735 736 737 738 739 | int bClear, TestDb **ppDb ){ static const DatabaseMethods LsmMethods = { test_lsm_close, test_lsm_write, test_lsm_delete, test_lsm_fetch, test_lsm_scan, test_lsm_begin, test_lsm_commit, test_lsm_rollback }; | > | 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 | int bClear, TestDb **ppDb ){ static const DatabaseMethods LsmMethods = { test_lsm_close, test_lsm_write, test_lsm_delete, test_lsm_delete_range, test_lsm_fetch, test_lsm_scan, test_lsm_begin, test_lsm_commit, test_lsm_rollback }; |
︙ | ︙ | |||
1111 1112 1113 1114 1115 1116 1117 | } } return rc; } | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < > | > | < | 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 | } } return rc; } int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb){ const char *zCfg = "threads=2"; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb){ const char *zCfg = "threads=3"; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } #else static void mt_shutdown(LsmDb *pDb) { unused_parameter(pDb); } int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){ unused_parameter(zFilename); unused_parameter(bClear); unused_parameter(ppDb); testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n"); return 1; } #endif |
Changes to src/lsm.h.
︙ | ︙ | |||
376 377 378 379 380 381 382 | /* ** Delete a value from the database. No error is returned if the specified ** key value does not exist in the database. */ int lsm_delete(lsm_db *, const void *pKey, int nKey); /* | | | > > > | 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 | /* ** Delete a value from the database. No error is returned if the specified ** key value does not exist in the database. */ int lsm_delete(lsm_db *, const void *pKey, int nKey); /* ** Delete all database entries with keys that are greater than (pKey1/nKey1) ** and smaller than (pKey2/nKey2). Note that keys (pKey1/nKey1) and ** (pKey2/nKey2) themselves, if they exist in the database, are not deleted. ** ** Return LSM_OK if successful, or an LSM error code otherwise. */ int lsm_delete_range(lsm_db *, const void *pKey1, int nKey1, const void *pKey2, int nKey2 ); /* ** The lsm_tree_size() function reports on the current state of the |
︙ | ︙ |
Changes to src/lsmInt.h.
︙ | ︙ | |||
143 144 145 146 147 148 149 150 151 152 153 154 155 156 | ** a checkpoint (the remainder are stored as a system record in the LSM). ** See also LSM_CONFIG_MAX_FREELIST. */ #define LSM_MAX_FREELIST_ENTRIES 100 #define LSM_ATTEMPTS_BEFORE_PROTOCOL 10000 /* ** A string that can grow by appending. */ struct LsmString { lsm_env *pEnv; /* Run-time environment */ int n; /* Size of string. -1 indicates error */ int nAlloc; /* Space allocated for z[] */ | > > > > > > > > > > > > | 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | ** a checkpoint (the remainder are stored as a system record in the LSM). ** See also LSM_CONFIG_MAX_FREELIST. */ #define LSM_MAX_FREELIST_ENTRIES 100 #define LSM_ATTEMPTS_BEFORE_PROTOCOL 10000 /* ** Each entry stored in the LSM (or in-memory tree structure) has an ** associated mask of the following flags. */ #define LSM_START_DELETE 0x01 /* Start of open-ended delete range */ #define LSM_END_DELETE 0x02 /* End of open-ended delete range */ #define LSM_POINT_DELETE 0x04 /* Delete this key */ #define LSM_INSERT 0x08 /* Insert this key and value */ #define LSM_SEPARATOR 0x10 /* True if entry is separator key only */ #define LSM_SYSTEMKEY 0x20 /* True if entry is a system key (FREELIST) */ /* ** A string that can grow by appending. */ struct LsmString { lsm_env *pEnv; /* Run-time environment */ int n; /* Size of string. -1 indicates error */ int nAlloc; /* Space allocated for z[] */ |
︙ | ︙ | |||
374 375 376 377 378 379 380 381 382 383 384 385 386 387 | }; struct Merge { int nInput; /* Number of input runs being merged */ MergeInput *aInput; /* Array nInput entries in size */ MergeInput splitkey; /* Location in file of current splitkey */ int nSkip; /* Number of separators entries to skip */ int iOutputOff; /* Write offset on output page */ int bHierReadonly; /* True if b-tree heirarchies are read-only */ }; /* ** The first argument to this macro is a pointer to a Segment structure. ** Returns true if the structure instance indicates that the separators ** array is valid. | > | 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 | }; struct Merge { int nInput; /* Number of input runs being merged */ MergeInput *aInput; /* Array nInput entries in size */ MergeInput splitkey; /* Location in file of current splitkey */ int nSkip; /* Number of separators entries to skip */ int iOutputOff; /* Write offset on output page */ Pgno iCurrentPtr; /* Current pointer value */ int bHierReadonly; /* True if b-tree heirarchies are read-only */ }; /* ** The first argument to this macro is a pointer to a Segment structure. ** Returns true if the structure instance indicates that the separators ** array is valid. |
︙ | ︙ | |||
539 540 541 542 543 544 545 | void lsmTreeCursorDestroy(TreeCursor *); int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes); int lsmTreeCursorNext(TreeCursor *pCsr); int lsmTreeCursorPrev(TreeCursor *pCsr); int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast); void lsmTreeCursorReset(TreeCursor *pCsr); | | > > | 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 | void lsmTreeCursorDestroy(TreeCursor *); int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes); int lsmTreeCursorNext(TreeCursor *pCsr); int lsmTreeCursorPrev(TreeCursor *pCsr); int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast); void lsmTreeCursorReset(TreeCursor *pCsr); int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey); int lsmTreeCursorFlags(TreeCursor *pCsr); int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal); int lsmTreeCursorValid(TreeCursor *pCsr); int lsmTreeCursorSave(TreeCursor *pCsr); void lsmFlagsToString(int flags, char *zFlags); /* ** Functions from file "mem.c". */ int lsmPoolNew(lsm_env *pEnv, Mempool **ppPool); void lsmPoolDestroy(lsm_env *pEnv, Mempool *pPool); void *lsmPoolMalloc(lsm_env *pEnv, Mempool *pPool, int nByte); |
︙ | ︙ |
Changes to src/lsm_ckpt.c.
︙ | ︙ | |||
60 61 62 63 64 65 66 67 68 69 70 71 72 73 | ** 4. If nRight>0, The number of segments involved in the merge ** 5. if nRight>0, Current nSkip value (see Merge structure defn.), ** 6. For each segment in the merge: ** 5a. Page number of next cell to read during merge ** 5b. Cell number of next cell to read during merge ** 7. Page containing current split-key. ** 8. Cell within page containing current split-key. ** ** The freelist. ** ** 1. Number of free-list entries stored in checkpoint header. ** 2. For each entry: ** 2a. Block number of free block. ** 2b. MSW of associated checkpoint id. | > | 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | ** 4. If nRight>0, The number of segments involved in the merge ** 5. if nRight>0, Current nSkip value (see Merge structure defn.), ** 6. For each segment in the merge: ** 5a. Page number of next cell to read during merge ** 5b. Cell number of next cell to read during merge ** 7. Page containing current split-key. ** 8. Cell within page containing current split-key. ** 9. Current pointer value. ** ** The freelist. ** ** 1. Number of free-list entries stored in checkpoint header. ** 2. For each entry: ** 2a. Block number of free block. ** 2b. MSW of associated checkpoint id. |
︙ | ︙ | |||
308 309 310 311 312 313 314 315 316 317 318 319 320 321 | ckptSetValue(p, iOut++, pMerge->nSkip, pRc); for(i=0; i<pMerge->nInput; i++){ ckptSetValue(p, iOut++, pMerge->aInput[i].iPg, pRc); ckptSetValue(p, iOut++, pMerge->aInput[i].iCell, pRc); } ckptSetValue(p, iOut++, pMerge->splitkey.iPg, pRc); ckptSetValue(p, iOut++, pMerge->splitkey.iCell, pRc); } *piOut = iOut; } /* ** Populate the log offset fields of the checkpoint buffer. 4 values. | > | 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 | ckptSetValue(p, iOut++, pMerge->nSkip, pRc); for(i=0; i<pMerge->nInput; i++){ ckptSetValue(p, iOut++, pMerge->aInput[i].iPg, pRc); ckptSetValue(p, iOut++, pMerge->aInput[i].iCell, pRc); } ckptSetValue(p, iOut++, pMerge->splitkey.iPg, pRc); ckptSetValue(p, iOut++, pMerge->splitkey.iCell, pRc); ckptSetValue(p, iOut++, pMerge->iCurrentPtr, pRc); } *piOut = iOut; } /* ** Populate the log offset fields of the checkpoint buffer. 4 values. |
︙ | ︙ | |||
497 498 499 500 501 502 503 504 505 506 507 508 509 510 | pMerge->nSkip = (int)aInt[iIn++]; for(i=0; i<nInput; i++){ pMerge->aInput[i].iPg = (Pgno)aInt[iIn++]; pMerge->aInput[i].iCell = (int)aInt[iIn++]; } pMerge->splitkey.iPg = (Pgno)aInt[iIn++]; pMerge->splitkey.iCell = (int)aInt[iIn++]; /* Set *piIn and return LSM_OK. */ *piIn = iIn; return LSM_OK; } | > | 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 | pMerge->nSkip = (int)aInt[iIn++]; for(i=0; i<nInput; i++){ pMerge->aInput[i].iPg = (Pgno)aInt[iIn++]; pMerge->aInput[i].iCell = (int)aInt[iIn++]; } pMerge->splitkey.iPg = (Pgno)aInt[iIn++]; pMerge->splitkey.iCell = (int)aInt[iIn++]; pMerge->iCurrentPtr = (int)aInt[iIn++]; /* Set *piIn and return LSM_OK. */ *piIn = iIn; return LSM_OK; } |
︙ | ︙ |
Changes to src/lsm_file.c.
︙ | ︙ | |||
725 726 727 728 729 730 731 732 733 734 735 736 737 738 | Page **ppPg /* OUT: New page handle */ ){ Page *p; int iHash; int rc = LSM_OK; assert( iPg>=fsFirstPageOnBlock(pFS, 1) ); if( pFS->bUseMmap ){ i64 iEnd = (i64)iPg * pFS->nPagesize; fsGrowMapping(pFS, iEnd, &rc); if( rc!=LSM_OK ) return rc; if( pFS->pFree ){ | > | 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 | Page **ppPg /* OUT: New page handle */ ){ Page *p; int iHash; int rc = LSM_OK; assert( iPg>=fsFirstPageOnBlock(pFS, 1) ); *ppPg = 0; if( pFS->bUseMmap ){ i64 iEnd = (i64)iPg * pFS->nPagesize; fsGrowMapping(pFS, iEnd, &rc); if( rc!=LSM_OK ) return rc; if( pFS->pFree ){ |
︙ | ︙ |
Changes to src/lsm_main.c.
︙ | ︙ | |||
475 476 477 478 479 480 481 | break; } va_end(ap); return rc; } | < < < | | > > | > > > > > > | > > > > > > > > > > > > > | | > > > > > > > > > > > > > > > | 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 | break; } va_end(ap); return rc; } static int doWriteOp( lsm_db *pDb, int bDeleteRange, const void *pKey, int nKey, /* Key to write or delete */ const void *pVal, int nVal /* Value to write. Or nVal==-1 for a delete */ ){ int rc = LSM_OK; /* Return code */ int bCommit = 0; /* True to commit before returning */ if( pDb->nTransOpen==0 ){ bCommit = 1; rc = lsm_begin(pDb, 1); } if( rc==LSM_OK ){ if( bDeleteRange==0 ){ rc = lsmLogWrite(pDb, (void *)pKey, nKey, (void *)pVal, nVal); }else{ /* TODO */ } } lsmSortedSaveTreeCursors(pDb); if( rc==LSM_OK ){ int pgsz = lsmFsPageSize(pDb->pFS); int nQuant = LSM_AUTOWORK_QUANT * pgsz; int nBefore; int nAfter; int nDiff; if( nQuant>pDb->nTreeLimit ){ nQuant = pDb->nTreeLimit; } nBefore = lsmTreeSize(pDb); if( bDeleteRange ){ rc = lsmTreeDelete(pDb, (void *)pKey, nKey, (void *)pVal, nVal); }else{ rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal); } nAfter = lsmTreeSize(pDb); nDiff = (nAfter/nQuant) - (nBefore/nQuant); if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){ rc = lsmSortedAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT); } } /* If a transaction was opened at the start of this function, commit it. ** Or, if an error has occurred, roll it back. */ if( bCommit ){ if( rc==LSM_OK ){ rc = lsm_commit(pDb, 0); }else{ lsm_rollback(pDb, 0); } } return rc; } /* ** Write a new value into the database. */ int lsm_write( lsm_db *db, /* Database connection */ const void *pKey, int nKey, /* Key to write or delete */ const void *pVal, int nVal /* Value to write. Or nVal==-1 for a delete */ ){ return doWriteOp(db, 0, pKey, nKey, pVal, nVal); } /* ** Delete a value from the database. */ int lsm_delete(lsm_db *db, const void *pKey, int nKey){ return doWriteOp(db, 0, pKey, nKey, 0, -1); } /* ** Delete a range of database keys. */ int lsm_delete_range( lsm_db *db, /* Database handle */ const void *pKey1, int nKey1, /* Lower bound of range to delete */ const void *pKey2, int nKey2 /* Upper bound of range to delete */ ){ int rc = LSM_OK; if( db->xCmp((void *)pKey1, nKey1, (void *)pKey2, nKey2)<0 ){ rc = doWriteOp(db, 1, pKey1, nKey1, pKey2, nKey2); } return rc; } /* ** Open a new cursor handle. ** ** If there are currently no other open cursor handles, and no open write ** transaction, open a read transaction here. |
︙ | ︙ |
Changes to src/lsm_shared.c.
︙ | ︙ | |||
631 632 633 634 635 636 637 | /* ** Argument bFlush is true if the contents of the in-memory tree has just ** been flushed to disk. The significance of this is that once the snapshot ** created to hold the updated state of the database is synced to disk, log ** file space can be recycled. */ void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){ | > > | | < < | | | | < < | 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 | /* ** Argument bFlush is true if the contents of the in-memory tree has just ** been flushed to disk. The significance of this is that once the snapshot ** created to hold the updated state of the database is synced to disk, log ** file space can be recycled. */ void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){ assert( *pRc!=0 || pDb->pWorker ); if( pDb->pWorker ){ /* If no error has occurred, serialize the worker snapshot and write ** it to shared memory. */ assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 ); if( *pRc==LSM_OK ){ *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl); } lsmFreeSnapshot(pDb->pEnv, pDb->pWorker); pDb->pWorker = 0; } lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0); } |
︙ | ︙ |
Changes to src/lsm_sorted.c.
︙ | ︙ | |||
38 39 40 41 42 43 44 | ** (N==0). And on most pages the first record that starts on the page will ** not start at byte offset 0. For example: ** ** aaaaa bbbbb ccc <footer> cc eeeee fffff g <footer> gggg.... ** ** RECORD FORMAT: ** | < < < | > > | > | | > < | | | | | | | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | | > | | > | 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | ** (N==0). And on most pages the first record that starts on the page will ** not start at byte offset 0. For example: ** ** aaaaa bbbbb ccc <footer> cc eeeee fffff g <footer> gggg.... ** ** RECORD FORMAT: ** ** The first byte of the record is a flags byte. It is a combination ** of the following flags (defined in lsmInt.h): ** ** LSM_START_DELETE ** LSM_END_DELETE ** LSM_POINT_DELETE ** LSM_INSERT ** LSM_SEPARATOR ** LSM_SYSTEMKEY ** ** Immediately following the type byte is a pointer to the smallest key ** in the next file that is larger than the key in the current record. The ** pointer is encoded as a varint. When added to the 32-bit page number ** stored in the footer, it is the page number of the page that contains the ** smallest key in the next sorted file that is larger than this key. ** ** Next is the number of bytes in the key, encoded as a varint. ** ** If the LSM_INSERT flag is set, the number of bytes in the value, as ** a varint, is next. ** ** Finally, the blob of data containing the key, and for LSM_INSERT ** records, the value as well. */ #ifndef _LSM_INT_H # include "lsmInt.h" #endif #include "sqlite4.h" /* only for sqlite4_snprintf() */ /* ** Macros to help decode record types. */ #define rtTopic(eType) ((eType) & LSM_SYSTEMKEY) #define rtIsDelete(eType) (((eType) & 0x0F)==LSM_POINT_DELETE) #define rtIsSeparator(eType) (((eType) & LSM_SEPARATOR)!=0) #define rtIsWrite(eType) (((eType) & LSM_INSERT)!=0) #define rtIsSystem(eType) (((eType) & LSM_SYSTEMKEY)!=0) /* ** The following macros are used to access a page footer. */ #define SEGMENT_NRECORD_OFFSET(pgsz) ((pgsz) - 2) #define SEGMENT_FLAGS_OFFSET(pgsz) ((pgsz) - 2 - 2) #define SEGMENT_POINTER_OFFSET(pgsz) ((pgsz) - 2 - 2 - 4) |
︙ | ︙ | |||
230 231 232 233 234 235 236 237 238 239 240 241 242 243 | /* Comparison results */ int nTree; /* Size of aTree[] array */ int *aTree; /* Array of comparison results */ /* Used by cursors flushing the in-memory tree only */ int *pnOvfl; /* Number of free-list entries to store */ void *pSystemVal; /* Pointer to buffer to free */ }; #define CURSOR_DATA_TREE0 0 /* Current tree cursor */ #define CURSOR_DATA_TREE1 1 /* The "old" tree, if any */ #define CURSOR_DATA_SYSTEM 2 #define CURSOR_DATA_SEGMENT 3 | > > > | 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | /* Comparison results */ int nTree; /* Size of aTree[] array */ int *aTree; /* Array of comparison results */ /* Used by cursors flushing the in-memory tree only */ int *pnOvfl; /* Number of free-list entries to store */ void *pSystemVal; /* Pointer to buffer to free */ /* Used by worker cursors only */ Pgno *pPrevMergePtr; }; #define CURSOR_DATA_TREE0 0 /* Current tree cursor */ #define CURSOR_DATA_TREE1 1 /* The "old" tree, if any */ #define CURSOR_DATA_SYSTEM 2 #define CURSOR_DATA_SEGMENT 3 |
︙ | ︙ | |||
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 | ** ** CURSOR_PREV_OK ** Set if it is Ok to call lsm_csr_prev(). ** ** CURSOR_READ_SEPARATORS ** Set if this cursor should visit the separator keys in segment ** aPtr[nPtr-1]. */ #define CURSOR_IGNORE_DELETE 0x00000001 #define CURSOR_NEW_SYSTEM 0x00000002 #define CURSOR_AT_FREELIST 0x00000004 #define CURSOR_IGNORE_SYSTEM 0x00000010 #define CURSOR_NEXT_OK 0x00000020 #define CURSOR_PREV_OK 0x00000040 #define CURSOR_READ_SEPARATORS 0x00000080 typedef struct MergeWorker MergeWorker; typedef struct Hierarchy Hierarchy; struct Hierarchy { Page **apHier; int nHier; | > > > > > > | 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 | ** ** CURSOR_PREV_OK ** Set if it is Ok to call lsm_csr_prev(). ** ** CURSOR_READ_SEPARATORS ** Set if this cursor should visit the separator keys in segment ** aPtr[nPtr-1]. ** ** CURSOR_SEEK_EQ ** Cursor has undergone a successful lsm_csr_seek(LSM_SEEK_EQ) operation. ** The key and value are stored in MultiCursor.key and MultiCursor.val ** respectively. */ #define CURSOR_IGNORE_DELETE 0x00000001 #define CURSOR_NEW_SYSTEM 0x00000002 #define CURSOR_AT_FREELIST 0x00000004 #define CURSOR_IGNORE_SYSTEM 0x00000010 #define CURSOR_NEXT_OK 0x00000020 #define CURSOR_PREV_OK 0x00000040 #define CURSOR_READ_SEPARATORS 0x00000080 #define CURSOR_SEEK_EQ 0x00000100 typedef struct MergeWorker MergeWorker; typedef struct Hierarchy Hierarchy; struct Hierarchy { Page **apHier; int nHier; |
︙ | ︙ | |||
610 611 612 613 614 615 616 | } if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT; rc = pageGetBtreeKey( pCsr->aPg[iPg].pPage, iCell, &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob ); | | | 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 | } if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT; rc = pageGetBtreeKey( pCsr->aPg[iPg].pPage, iCell, &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob ); pCsr->eType |= LSM_SEPARATOR; } return rc; } static int btreeCursorPtr(u8 *aData, int nData, int iCell){ int nCell; |
︙ | ︙ | |||
918 919 920 921 922 923 924 | if( pCsr->aPg[i].iCell>0 ) break; } assert( i>=0 ); rc = pageGetBtreeKey( pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1, &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob ); | | | 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 | if( pCsr->aPg[i].iCell>0 ) break; } assert( i>=0 ); rc = pageGetBtreeKey( pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1, &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob ); pCsr->eType |= LSM_SEPARATOR; }else{ rc = btreeCursorLoadKey(pCsr); } } } return rc; |
︙ | ︙ | |||
1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 | static int segmentPtrAdvance( MultiCursor *pCsr, SegmentPtr *pPtr, int bReverse ){ int eDir = (bReverse ? -1 : 1); do { int rc; int iCell; /* Number of new cell in page */ iCell = pPtr->iCell + eDir; assert( pPtr->pPg ); assert( iCell<=pPtr->nCell && iCell>=-1 ); if( iCell>=pPtr->nCell || iCell<0 ){ do { rc = segmentPtrNextPage(pPtr, eDir); }while( rc==LSM_OK && pPtr->pPg && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG) ) ); if( rc!=LSM_OK ) return rc; iCell = bReverse ? (pPtr->nCell-1) : 0; } rc = segmentPtrLoadCell(pPtr, iCell); if( rc!=LSM_OK ) return rc; }while( pCsr && pPtr->pPg && segmentPtrIgnoreSeparators(pCsr, pPtr) && rtIsSeparator(pPtr->eType) ); return LSM_OK; | > > > > > > > > > > > > > > > > > > > > > > > | 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 | static int segmentPtrAdvance( MultiCursor *pCsr, SegmentPtr *pPtr, int bReverse ){ int eDir = (bReverse ? -1 : 1); Level *pLvl = pPtr->pLevel; do { int rc; int iCell; /* Number of new cell in page */ int svFlags = 0; /* SegmentPtr.eType before advance */ iCell = pPtr->iCell + eDir; assert( pPtr->pPg ); assert( iCell<=pPtr->nCell && iCell>=-1 ); if( bReverse && pPtr->pSeg!=&pPtr->pLevel->lhs ){ svFlags = pPtr->eType; assert( svFlags ); } if( iCell>=pPtr->nCell || iCell<0 ){ do { rc = segmentPtrNextPage(pPtr, eDir); }while( rc==LSM_OK && pPtr->pPg && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG) ) ); if( rc!=LSM_OK ) return rc; iCell = bReverse ? (pPtr->nCell-1) : 0; } rc = segmentPtrLoadCell(pPtr, iCell); if( rc!=LSM_OK ) return rc; if( svFlags && pPtr->pPg ){ int res = sortedKeyCompare(pCsr->pDb->xCmp, rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey, pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey ); if( res<0 ) segmentPtrReset(pPtr); } if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){ rc = lsmFsDbPageGet(pCsr->pDb->pFS, pPtr->pSeg->iFirst, &pPtr->pPg); if( rc!=LSM_OK ) return rc; pPtr->eType = LSM_START_DELETE | (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0); pPtr->pKey = pLvl->pSplitKey; pPtr->nKey = pLvl->nSplitKey; } }while( pCsr && pPtr->pPg && segmentPtrIgnoreSeparators(pCsr, pPtr) && rtIsSeparator(pPtr->eType) ); return LSM_OK; |
︙ | ︙ | |||
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 | ** points at either the first (bLast==0) or last (bLast==1) cell in the valid ** region of the segment defined by pPtr->iFirst and pPtr->iLast. ** ** Return LSM_OK if successful or an lsm error code if something goes ** wrong (IO error, OOM etc.). */ static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){ int rc = LSM_OK; FileSystem *pFS = pCsr->pDb->pFS; int bIgnore; segmentPtrEndPage(pFS, pPtr, bLast, &rc); while( rc==LSM_OK && pPtr->pPg && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG)) ){ rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1)); } | > | | > > > > > > > > > > > > > | 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 | ** points at either the first (bLast==0) or last (bLast==1) cell in the valid ** region of the segment defined by pPtr->iFirst and pPtr->iLast. ** ** Return LSM_OK if successful or an lsm error code if something goes ** wrong (IO error, OOM etc.). */ static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){ Level *pLvl = pPtr->pLevel; int rc = LSM_OK; FileSystem *pFS = pCsr->pDb->pFS; int bIgnore; segmentPtrEndPage(pFS, pPtr, bLast, &rc); while( rc==LSM_OK && pPtr->pPg && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG)) ){ rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1)); } if( rc==LSM_OK && pPtr->pPg ){ rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0); } bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr); if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){ rc = segmentPtrAdvance(pCsr, pPtr, bLast); } if( bLast && rc==LSM_OK && pPtr->pPg && pPtr->pSeg==&pLvl->lhs && pLvl->nRight && (pPtr->eType & LSM_START_DELETE) ){ pPtr->iCell++; pPtr->eType = LSM_END_DELETE | (pLvl->iSplitTopic); pPtr->pKey = pLvl->pSplitKey; pPtr->nKey = pLvl->nSplitKey; pPtr->pVal = 0; pPtr->nVal = 0; } return rc; } static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){ assert( pPtr->pPg ); *ppKey = pPtr->pKey; *pnKey = pPtr->nKey; |
︙ | ︙ | |||
1291 1292 1293 1294 1295 1296 1297 | if( eSeek==LSM_SEEK_GE ) return (res<=0); } return 1; } #endif | | | < < < < < < < < | 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 | if( eSeek==LSM_SEEK_GE ) return (res<=0); } return 1; } #endif static int segmentPtrSearchOversized( MultiCursor *pCsr, /* Cursor context */ SegmentPtr *pPtr, /* Pointer to seek */ void *pKey, int nKey /* Key to seek to */ ){ int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp; int rc = LSM_OK; /* If the OVERSIZED flag is set, then there is no pointer in the ** upper level to the next page in the segment that contains at least ** one key. So compare the largest key on the current page with the ** key being sought (pKey/nKey). If (pKey/nKey) is larger, advance ** to the next page in the segment that contains at least one key. */ |
︙ | ︙ | |||
1329 1330 1331 1332 1333 1334 1335 | pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1 ); /* If the loaded key is >= than (pKey/nKey), break out of the loop. ** If (pKey/nKey) is present in this array, it must be on the current ** page. */ res = sortedKeyCompare( | | | 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 | pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1 ); /* If the loaded key is >= than (pKey/nKey), break out of the loop. ** If (pKey/nKey) is present in this array, it must be on the current ** page. */ res = sortedKeyCompare( xCmp, iLastTopic, pLastKey, nLastKey, 0, pKey, nKey ); if( res>=0 ) break; /* Advance to the next page that contains at least one key. */ pNext = pPtr->pPg; lsmFsPageRef(pNext); while( 1 ){ |
︙ | ︙ | |||
1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 | if( pNext==0 ) break; segmentPtrSetPage(pPtr, pNext); /* This should probably be an LSM_CORRUPT error. */ assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) ); } iPtrOut = pPtr->iPtr; /* Assert that this page is the right page of this segment for the key ** that we are searching for. Do this by loading page (iPg-1) and testing ** that pKey/nKey is greater than all keys on that page, and then by ** loading (iPg+1) and testing that pKey/nKey is smaller than all | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | > > > | 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 | if( pNext==0 ) break; segmentPtrSetPage(pPtr, pNext); /* This should probably be an LSM_CORRUPT error. */ assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) ); } return rc; } static int ptrFwdPointer( Page *pPage, int iCell, Segment *pSeg, Pgno *piPtr, int *pbFound ){ Page *pPg = pPage; int iFirst = iCell; int rc = LSM_OK; do { Page *pNext = 0; u8 *aData; int nData; aData = lsmFsPageData(pPg, &nData); if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ){ int i; int nCell = pageGetNRec(aData, nData); for(i=iFirst; i<nCell; i++){ u8 eType = *pageGetCell(aData, nData, i); if( (eType & LSM_START_DELETE)==0 ){ *pbFound = 1; *piPtr = pageGetRecordPtr(aData, nData, i) + pageGetPtr(aData, nData); lsmFsPageRelease(pPg); return LSM_OK; } } } rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext); lsmFsPageRelease(pPg); pPg = pNext; iFirst = 0; }while( pPg && rc==LSM_OK ); lsmFsPageRelease(pPg); *pbFound = 0; return rc; } static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){ int rc; rc = segmentPtrEnd(pCsr, pPtr, 0); while( pPtr->pPg && rc==LSM_OK ){ int res = sortedKeyCompare(pCsr->pDb->xCmp, pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey, rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey ); if( res<=0 ) break; rc = segmentPtrAdvance(pCsr, pPtr, 0); } return rc; } /* ** This function is called as part of a SEEK_GE op on a multi-cursor if the ** FC pointer read from segment *pPtr comes from an entry with the ** LSM_START_DELETE flag set. In this case the pointer value cannot be ** trusted. Instead, the pointer that should be followed is that associated ** with the next entry in *pPtr that does not have LSM_START_DELETE set. ** ** Why the pointers can't be trusted: ** ** ** ** TODO: This is a stop-gap solution: ** ** At the moment, this function is called from within segmentPtrSeek(), ** as part of the initial lsmMCursorSeek() call. However, consider a ** database where the following has occurred: ** ** 1. A range delete removes keys 1..9999 using a range delete. ** 2. Keys 1 through 9999 are reinserted. ** 3. The levels containing the ops in 1. and 2. above are merged. Call ** this level N. Level N contains FC pointers to level N+1. ** ** Then, if the user attempts to query for (key>=2 LIMIT 10), the ** lsmMCursorSeek() call will iterate through 9998 entries searching for a ** pointer down to the level N+1 that is never actually used. It would be ** much better if the multi-cursor could do this lazily - only seek to the ** level (N+1) page after the user has moved the cursor on level N passed ** the big range-delete. */ static int segmentPtrFwdPointer( MultiCursor *pCsr, /* Multi-cursor pPtr belongs to */ SegmentPtr *pPtr, /* Segment-pointer to extract FC ptr from */ Pgno *piPtr /* OUT: FC pointer value */ ){ Level *pLvl = pPtr->pLevel; Level *pNext = pLvl->pNext; Page *pPg = pPtr->pPg; int rc; int bFound; Pgno iOut = 0; if( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[pLvl->nRight-1] ){ if( pNext==0 || (pNext->nRight==0 && pNext->lhs.iRoot) || (pNext->nRight!=0 && pNext->aRhs[0].iRoot) ){ /* Do nothing. The pointer will not be used anyway. */ return LSM_OK; } }else{ if( pPtr[1].pSeg->iRoot ){ return LSM_OK; } } /* Search for a pointer within the current segment. */ lsmFsPageRef(pPg); rc = ptrFwdPointer(pPg, pPtr->iCell, pPtr->pSeg, &iOut, &bFound); if( rc==LSM_OK && bFound==0 ){ /* This case happens when pPtr points to the left-hand-side of a segment ** currently undergoing an incremental merge. In this case, jump to the ** oldest segment in the right-hand-side of the same level and continue ** searching. But - do not consider any keys smaller than the levels ** split-key. */ SegmentPtr ptr; if( pPtr->pLevel->nRight==0 || pPtr->pSeg!=&pPtr->pLevel->lhs ){ return LSM_CORRUPT_BKPT; } memset(&ptr, 0, sizeof(SegmentPtr)); ptr.pLevel = pPtr->pLevel; ptr.pSeg = &ptr.pLevel->aRhs[ptr.pLevel->nRight-1]; rc = sortedRhsFirst(pCsr, ptr.pLevel, &ptr); if( rc==LSM_OK ){ rc = ptrFwdPointer(ptr.pPg, ptr.iCell, ptr.pSeg, &iOut, &bFound); ptr.pPg = 0; } segmentPtrReset(&ptr); } *piPtr = iOut; return rc; } static int segmentPtrSeek( MultiCursor *pCsr, /* Cursor context */ SegmentPtr *pPtr, /* Pointer to seek */ void *pKey, int nKey, /* Key to seek to */ int eSeek, /* Search bias - see above */ int *piPtr, /* OUT: FC pointer */ int *pbStop ){ int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp; int res; /* Result of comparison operation */ int rc = LSM_OK; int iMin; int iMax; int iPtrOut = 0; const int iTopic = 0; /* If the current page contains an oversized entry, then there are no ** pointers to one or more of the subsequent pages in the sorted run. ** The following call ensures that the segment-ptr points to the correct ** page in this case. */ rc = segmentPtrSearchOversized(pCsr, pPtr, pKey, nKey); iPtrOut = pPtr->iPtr; /* Assert that this page is the right page of this segment for the key ** that we are searching for. Do this by loading page (iPg-1) and testing ** that pKey/nKey is greater than all keys on that page, and then by ** loading (iPg+1) and testing that pKey/nKey is smaller than all ** the keys it houses. ** ** TODO: With range-deletes in the tree, the test described above may fail. */ #if 0 assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) ); #endif assert( pPtr->nCell>0 || pPtr->pSeg->nSize==1 || lsmFsPageNumber(pPtr->pPg)==pPtr->pSeg->iLast |
︙ | ︙ | |||
1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 | } if( rc==LSM_OK ){ assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) ); if( res ){ rc = segmentPtrLoadCell(pPtr, iMin); } if( rc==LSM_OK ){ switch( eSeek ){ | > | > > > > > > > > > > > > > > > > | > | > > > > > > > | > > | > | 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 | } if( rc==LSM_OK ){ assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) ); if( res ){ rc = segmentPtrLoadCell(pPtr, iMin); } assert( rc!=LSM_OK || res>0 || iPtrOut==(pPtr->iPtr + pPtr->iPgPtr) ); if( rc==LSM_OK ){ switch( eSeek ){ case LSM_SEEK_EQ: { int eType = pPtr->eType; if( (res<0 && (eType & LSM_START_DELETE)) || (res>0 && (eType & LSM_END_DELETE)) || (res==0 && (eType & LSM_POINT_DELETE)) ){ *pbStop = 1; }else if( res==0 && (eType & LSM_INSERT) ){ lsm_env *pEnv = pCsr->pDb->pEnv; *pbStop = 1; pCsr->eType = pPtr->eType; rc = sortedBlobSet(pEnv, &pCsr->key, pPtr->pKey, pPtr->nKey); if( rc==LSM_OK ){ rc = sortedBlobSet(pEnv, &pCsr->val, pPtr->pVal, pPtr->nVal); } pCsr->flags |= CURSOR_SEEK_EQ; } segmentPtrReset(pPtr); break; } case LSM_SEEK_LE: if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1); break; case LSM_SEEK_GE: { /* Figure out if we need to 'skip' the pointer forward or not */ if( (res<=0 && (pPtr->eType & LSM_START_DELETE)) || (res>0 && (pPtr->eType & LSM_END_DELETE)) ){ rc = segmentPtrFwdPointer(pCsr, pPtr, &iPtrOut); } if( res<0 && rc==LSM_OK ){ rc = segmentPtrAdvance(pCsr, pPtr, 0); } break; } } } } /* If the cursor seek has found a separator key, and this cursor is ** supposed to ignore separators keys, advance to the next entry. */ if( rc==LSM_OK && pPtr->pPg && segmentPtrIgnoreSeparators(pCsr, pPtr) && rtIsSeparator(pPtr->eType) ){ assert( eSeek!=LSM_SEEK_EQ ); rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE); } } assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) ); *piPtr = iPtrOut; return rc; |
︙ | ︙ | |||
1531 1532 1533 1534 1535 1536 1537 | static int seekInSegment( MultiCursor *pCsr, SegmentPtr *pPtr, void *pKey, int nKey, int iPg, /* Page to search */ int eSeek, /* Search bias - see above */ | | > | > > > > > > > > > > > < | | > > > | | | > > | 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 | static int seekInSegment( MultiCursor *pCsr, SegmentPtr *pPtr, void *pKey, int nKey, int iPg, /* Page to search */ int eSeek, /* Search bias - see above */ int *piPtr, /* OUT: FC pointer */ int *pbStop /* OUT: Stop search flag */ ){ int iPtr = iPg; int rc = LSM_OK; if( pPtr->pSeg->iRoot ){ Page *pPg; assert( pPtr->pSeg->iRoot!=0 ); rc = seekInBtree(pCsr, pPtr->pSeg, pKey, nKey, 0, &pPg); if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg); }else{ if( iPtr==0 ){ iPtr = pPtr->pSeg->iFirst; } if( rc==LSM_OK ){ rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr); } } if( rc==LSM_OK ){ rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, eSeek, piPtr, pbStop); } return rc; } /* ** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[]. ** ** pbStop: ** This parameter is only significant if parameter eSeek is set to ** LSM_SEEK_EQ. In this case, it is set to true before returning if ** the seek operation is finished. This can happen in two ways: ** ** a) A key matching (pKey/nKey) is found, or ** b) A point-delete or range-delete deleting the key is found. ** ** In case (a), the multi-cursor CURSOR_SEEK_EQ flag is set and the pCsr->key ** and pCsr->val blobs populated before returning. */ static int seekInLevel( MultiCursor *pCsr, /* Sorted cursor object to seek */ SegmentPtr *aPtr, /* Pointer to array of (nRhs+1) SPs */ int eSeek, /* Search bias - see above */ void *pKey, int nKey, /* Key to search for */ Pgno *piPgno, /* IN/OUT: fraction cascade pointer (or 0) */ int *pbStop /* OUT: See above */ ){ Level *pLvl = aPtr[0].pLevel; /* Level to seek within */ int rc = LSM_OK; /* Return code */ int iOut = 0; /* Pointer to return to caller */ int res = -1; /* Result of xCmp(pKey, split) */ int nRhs = pLvl->nRight; /* Number of right-hand-side segments */ int bStop = 0; /* If this is a composite level (one currently undergoing an incremental ** merge), figure out if the search key is larger or smaller than the ** levels split-key. */ if( nRhs ){ res = sortedKeyCompare(pCsr->pDb->xCmp, 0, pKey, nKey, pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey ); } /* If (res<0), then key pKey/nKey is smaller than the split-key (or this ** is not a composite level and there is no split-key). Search the ** left-hand-side of the level in this case. */ if( res<0 ){ int iPtr = 0; if( nRhs==0 ) iPtr = *piPgno; rc = seekInSegment(pCsr, &aPtr[0], pKey, nKey, iPtr, eSeek, &iOut, &bStop); if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){ res = 0; } } if( res>=0 ){ int iPtr = *piPgno; int i; for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){ iOut = 0; rc = seekInSegment(pCsr, &aPtr[i], pKey, nKey, iPtr, eSeek, &iOut,&bStop); iPtr = iOut; } if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){ rc = segmentPtrEnd(pCsr, &aPtr[0], 1); } } assert( eSeek==LSM_SEEK_EQ || bStop==0 ); *piPgno = iOut; *pbStop = bStop; return rc; } static void multiCursorGetKey( MultiCursor *pCsr, int iKey, int *peType, /* OUT: Key type (SORTED_WRITE etc.) */ |
︙ | ︙ | |||
1631 1632 1633 1634 1635 1636 1637 | case CURSOR_DATA_TREE0: case CURSOR_DATA_TREE1: { TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]; if( lsmTreeCursorValid(pTreeCsr) ){ int nVal; void *pVal; | | < | | 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 | case CURSOR_DATA_TREE0: case CURSOR_DATA_TREE1: { TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]; if( lsmTreeCursorValid(pTreeCsr) ){ int nVal; void *pVal; lsmTreeCursorKey(pTreeCsr, &eType, &pKey, &nKey); lsmTreeCursorValue(pTreeCsr, &pVal, &nVal); } break; } case CURSOR_DATA_SYSTEM: if( pCsr->flags & CURSOR_AT_FREELIST ){ pKey = (void *)"FREELIST"; nKey = 8; eType = LSM_SYSTEMKEY | LSM_INSERT; } break; default: { int iPtr = iKey - CURSOR_DATA_SEGMENT; assert( iPtr>=0 ); if( iPtr==pCsr->nPtr ){ |
︙ | ︙ | |||
1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 | } if( peType ) *peType = eType; if( pnKey ) *pnKey = nKey; if( ppKey ) *ppKey = pKey; } static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){ int i1; int i2; int iRes; void *pKey1; int nKey1; int eType1; void *pKey2; int nKey2; int eType2; | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > < | | | | > | < < < < < < < < < < < < < < < | < < < < < < < < < < < < < < < < | 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 | } if( peType ) *peType = eType; if( pnKey ) *pnKey = nKey; if( ppKey ) *ppKey = pKey; } static int sortedDbKeyCompare( int (*xCmp)(void *, int, void *, int), int iLhsFlags, void *pLhsKey, int nLhsKey, int iRhsFlags, void *pRhsKey, int nRhsKey ){ int res; /* Compare the keys, including the system flag. */ res = sortedKeyCompare(xCmp, rtTopic(iLhsFlags), pLhsKey, nLhsKey, rtTopic(iRhsFlags), pRhsKey, nRhsKey ); /* If a key has the LSM_START_DELETE flag set, but not the LSM_INSERT or ** LSM_POINT_DELETE flags, it is considered a delta larger. This prevents ** the beginning of an open-ended set from masking a database entry or ** delete at a lower level. */ if( res==0 ){ const int insdel = LSM_POINT_DELETE|LSM_INSERT; int iDel1 = 0; int iDel2 = 0; if( LSM_START_DELETE==(iLhsFlags & (LSM_START_DELETE|insdel)) ) iDel1 = +1; if( LSM_END_DELETE ==(iLhsFlags & (LSM_END_DELETE |insdel)) ) iDel1 = -1; if( LSM_START_DELETE==(iRhsFlags & (LSM_START_DELETE|insdel)) ) iDel2 = +1; if( LSM_END_DELETE ==(iRhsFlags & (LSM_END_DELETE |insdel)) ) iDel2 = -1; res = (iDel1 - iDel2); } return res; } static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){ int i1; int i2; int iRes; void *pKey1; int nKey1; int eType1; void *pKey2; int nKey2; int eType2; const int mul = (bReverse ? -1 : 1); assert( pCsr->aTree && iOut<pCsr->nTree ); if( iOut>=(pCsr->nTree/2) ){ i1 = (iOut - pCsr->nTree/2) * 2; i2 = i1 + 1; }else{ i1 = pCsr->aTree[iOut*2]; i2 = pCsr->aTree[iOut*2+1]; } multiCursorGetKey(pCsr, i1, &eType1, &pKey1, &nKey1); multiCursorGetKey(pCsr, i2, &eType2, &pKey2, &nKey2); if( pKey1==0 ){ iRes = i2; }else if( pKey2==0 ){ iRes = i1; }else{ int res; /* Compare the keys */ res = sortedDbKeyCompare(pCsr->pDb->xCmp, eType1, pKey1, nKey1, eType2, pKey2, nKey2 ); res = res * mul; if( res==0 ){ iRes = (rtIsSeparator(eType1) ? i2 : i1); }else if( res<0 ){ iRes = i1; }else{ iRes = i2; } } pCsr->aTree[iOut] = iRes; } /* ** This function advances segment pointer iPtr belonging to multi-cursor ** pCsr forward (bReverse==0) or backward (bReverse!=0). ** ** If the segment pointer points to a segment that is part of a composite ** level, then the following special case is handled. ** ** * If iPtr is the lhs of a composite level, and the cursor is being ** advanced forwards, and segment iPtr is at EOF, move all pointers ** that correspond to rhs segments of the same level to the first ** key in their respective data. */ static int segmentCursorAdvance( MultiCursor *pCsr, int iPtr, int bReverse ){ int rc; SegmentPtr *pPtr = &pCsr->aPtr[iPtr]; Level *pLvl = pPtr->pLevel; int bComposite; rc = segmentPtrAdvance(pCsr, pPtr, bReverse); if( rc!=LSM_OK ) return rc; bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight); if( bComposite && pPtr->pSeg==&pLvl->lhs /* lhs of composite level */ && bReverse==0 /* csr advanced forwards */ && pPtr->pPg==0 /* segment at EOF */ ){ int i; for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){ rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]); } for(i=pCsr->nTree-1; i>0; i--){ multiCursorDoCompare(pCsr, i, 0); } } return rc; } static void mcursorFreeComponents(MultiCursor *pCsr){ int i; lsm_env *pEnv = pCsr->pDb->pEnv; |
︙ | ︙ | |||
2077 2078 2079 2080 2081 2082 2083 | rc = multiCursorAddAll(pCsr, pDb->pWorker); pCsr->flags |= CURSOR_IGNORE_DELETE; } if( rc==LSM_OK ){ rc = lsmMCursorLast(pCsr); if( rc==LSM_OK | | | 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 | rc = multiCursorAddAll(pCsr, pDb->pWorker); pCsr->flags |= CURSOR_IGNORE_DELETE; } if( rc==LSM_OK ){ rc = lsmMCursorLast(pCsr); if( rc==LSM_OK && rtIsWrite(pCsr->eType) && rtIsSystem(pCsr->eType) && pCsr->key.nData==8 && 0==memcmp(pCsr->key.pData, "FREELIST", 8) ){ void *pVal; int nVal; /* Value read from database */ rc = lsmMCursorValue(pCsr, &pVal, &nVal); if( rc==LSM_OK ){ *ppVal = lsmMallocRc(pDb->pEnv, nVal, &rc); |
︙ | ︙ | |||
2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 | if( *pRc==LSM_OK ){ void *pKey; int nKey; multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey); *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey); } } static int multiCursorEnd(MultiCursor *pCsr, int bLast){ int rc = LSM_OK; int i; pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK); if( pCsr->apTreeCsr[0] ){ rc = lsmTreeCursorEnd(pCsr->apTreeCsr[0], bLast); } if( rc==LSM_OK && pCsr->apTreeCsr[1] ){ rc = lsmTreeCursorEnd(pCsr->apTreeCsr[1], bLast); } | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 | if( *pRc==LSM_OK ){ void *pKey; int nKey; multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey); *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey); } } static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){ int eType = pCsr->eType; int iKey; int i; int rdmask = 0; assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) ); if( pCsr->flags & CURSOR_NEXT_OK ){ rdmask = LSM_END_DELETE; }else{ rdmask = LSM_START_DELETE; } if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){ if( (eType & LSM_INSERT)==0 ) return 0; } if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){ return 0; } /* Check if this key has already been deleted by a range-delete */ iKey = pCsr->aTree[1]; if( (iKey>0 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[0]))) || (iKey>1 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[1]))) ){ return 0; } for(i=CURSOR_DATA_SEGMENT; i<iKey; i++){ int iPtr = i-CURSOR_DATA_SEGMENT; if( pCsr->aPtr[iPtr].pPg && (pCsr->aPtr[iPtr].eType & rdmask) ){ return 0; } } return 1; } static int multiCursorEnd(MultiCursor *pCsr, int bLast){ int rc = LSM_OK; int i; pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK); pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK); if( pCsr->apTreeCsr[0] ){ rc = lsmTreeCursorEnd(pCsr->apTreeCsr[0], bLast); } if( rc==LSM_OK && pCsr->apTreeCsr[1] ){ rc = lsmTreeCursorEnd(pCsr->apTreeCsr[1], bLast); } |
︙ | ︙ | |||
2173 2174 2175 2176 2177 2178 2179 | if( rc==LSM_OK ){ rc = multiCursorAllocTree(pCsr); } if( rc==LSM_OK ){ for(i=pCsr->nTree-1; i>0; i--){ multiCursorDoCompare(pCsr, i, bLast); } | < | < < < | 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 | if( rc==LSM_OK ){ rc = multiCursorAllocTree(pCsr); } if( rc==LSM_OK ){ for(i=pCsr->nTree-1; i>0; i--){ multiCursorDoCompare(pCsr, i, bLast); } } multiCursorCacheKey(pCsr, &rc); if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){ if( bLast ){ rc = lsmMCursorPrev(pCsr); }else{ rc = lsmMCursorNext(pCsr); } } |
︙ | ︙ | |||
2255 2256 2257 2258 2259 2260 2261 | lsmTreeCursorReset(pCsr->apTreeCsr[1]); for(i=0; i<pCsr->nPtr; i++){ segmentPtrReset(&pCsr->aPtr[i]); } pCsr->key.nData = 0; } | | > | > > | > > > > | > > > > > > | > > > > > > < < < < < < < < < < < < > | | | < < > | | > | | > | < | | > > | | | | | | | | | | | | | | | | | | | | | | | > > > | | 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 | lsmTreeCursorReset(pCsr->apTreeCsr[1]); for(i=0; i<pCsr->nPtr; i++){ segmentPtrReset(&pCsr->aPtr[i]); } pCsr->key.nData = 0; } static int treeCursorSeek( MultiCursor *pCsr, TreeCursor *pTreeCsr, void *pKey, int nKey, int eSeek, int *pbStop ){ int rc = LSM_OK; if( pTreeCsr ){ int res = 0; lsmTreeCursorSeek(pTreeCsr, pKey, nKey, &res); switch( eSeek ){ case LSM_SEEK_EQ: { int eType = lsmTreeCursorFlags(pTreeCsr); if( (res<0 && (eType & LSM_START_DELETE)) || (res>0 && (eType & LSM_END_DELETE)) || (res==0 && (eType & LSM_POINT_DELETE)) ){ *pbStop = 1; }else if( res==0 && (eType & LSM_INSERT) ){ lsm_env *pEnv = pCsr->pDb->pEnv; void *p; int n; /* Key/value from tree-cursor */ *pbStop = 1; pCsr->flags |= CURSOR_SEEK_EQ; rc = lsmTreeCursorKey(pTreeCsr, &pCsr->eType, &p, &n); if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->key, p, n); if( rc==LSM_OK ) rc = lsmTreeCursorValue(pTreeCsr, &p, &n); if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->val, p, n); } lsmTreeCursorReset(pTreeCsr); break; } case LSM_SEEK_GE: if( res<0 && lsmTreeCursorValid(pTreeCsr) ){ lsmTreeCursorNext(pTreeCsr); } break; default: if( res>0 ){ assert( lsmTreeCursorValid(pTreeCsr) ); lsmTreeCursorPrev(pTreeCsr); } break; } } return rc; } /* ** Seek the cursor. */ int lsmMCursorSeek(MultiCursor *pCsr, void *pKey, int nKey, int eSeek){ int eESeek = eSeek; /* Effective eSeek parameter */ int bStop = 0; /* Set to true to halt search operation */ int rc = LSM_OK; /* Return code */ int iPtr = 0; /* Used to iterate through pCsr->aPtr[] */ Pgno iPgno = 0; /* FC pointer value */ if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE; assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE ); assert( (pCsr->flags & CURSOR_NEW_SYSTEM)==0 ); assert( (pCsr->flags & CURSOR_AT_FREELIST)==0 ); assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel ); pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ); rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[0], pKey, nKey, eESeek, &bStop); if( rc==LSM_OK && bStop==0 ){ rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[1], pKey, nKey, eESeek, &bStop); } /* Seek all segment pointers. */ for(iPtr=0; iPtr<pCsr->nPtr && rc==LSM_OK && bStop==0; iPtr++){ SegmentPtr *pPtr = &pCsr->aPtr[iPtr]; assert( pPtr->pSeg==&pPtr->pLevel->lhs ); rc = seekInLevel(pCsr, pPtr, eESeek, pKey, nKey, &iPgno, &bStop); iPtr += pPtr->pLevel->nRight; } if( eSeek!=LSM_SEEK_EQ ){ if( rc==LSM_OK ){ rc = multiCursorAllocTree(pCsr); } if( rc==LSM_OK ){ int i; for(i=pCsr->nTree-1; i>0; i--){ multiCursorDoCompare(pCsr, i, eESeek==LSM_SEEK_LE); } if( eSeek==LSM_SEEK_GE ) pCsr->flags |= CURSOR_NEXT_OK; if( eSeek==LSM_SEEK_LE ) pCsr->flags |= CURSOR_PREV_OK; } multiCursorCacheKey(pCsr, &rc); if( rc==LSM_OK && eSeek!=LSM_SEEK_LEFAST && 0==mcursorLocationOk(pCsr, 0) ){ switch( eESeek ){ case LSM_SEEK_EQ: lsmMCursorReset(pCsr); break; case LSM_SEEK_GE: rc = lsmMCursorNext(pCsr); break; default: rc = lsmMCursorPrev(pCsr); break; } } } return rc; } int lsmMCursorValid(MultiCursor *pCsr){ int res = 0; if( pCsr->flags & CURSOR_SEEK_EQ ){ res = 1; }else if( pCsr->aTree ){ int iKey = pCsr->aTree[1]; if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){ res = lsmTreeCursorValid(pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]); }else{ void *pKey; multiCursorGetKey(pCsr, iKey, 0, &pKey, 0); res = pKey!=0; |
︙ | ︙ | |||
2388 2389 2390 2391 2392 2393 2394 | /* Check the current key value. If it is not greater than (if bReverse==0) ** or less than (if bReverse!=0) the key currently cached in pCsr->key, ** then the cursor has not yet been successfully advanced. */ multiCursorGetKey(pCsr, pCsr->aTree[1], &eNewType, &pNew, &nNew); if( pNew ){ | | < | < > | < | | | | | | | | | | > > > > > > > > > > > > > > > > > < | 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 | /* Check the current key value. If it is not greater than (if bReverse==0) ** or less than (if bReverse!=0) the key currently cached in pCsr->key, ** then the cursor has not yet been successfully advanced. */ multiCursorGetKey(pCsr, pCsr->aTree[1], &eNewType, &pNew, &nNew); if( pNew ){ int res = sortedDbKeyCompare(pCsr->pDb->xCmp, eNewType, pNew, nNew, pCsr->eType, pCsr->key.pData, pCsr->key.nData ); if( (bReverse==0 && res<=0) || (bReverse!=0 && res>=0) ){ return 0; } multiCursorCacheKey(pCsr, pRc); assert( pCsr->eType==eNewType ); /* If this cursor is configured to skip deleted keys, and the current ** cursor points to a SORTED_DELETE entry, then the cursor has not been ** successfully advanced. ** ** Similarly, if the cursor is configured to skip system keys and the ** current cursor points to a system key, it has not yet been advanced. */ if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0; } return 1; } static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){ int rc = LSM_OK; /* Return Code */ if( lsmMCursorValid(pCsr) ){ do { int iKey = pCsr->aTree[1]; /* If this multi-cursor is advancing forwards, and the sub-cursor ** being advanced is the one that separator keys may be being read ** from, record the current absolute pointer value. */ if( pCsr->pPrevMergePtr ){ if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){ assert( pCsr->pBtCsr ); *pCsr->pPrevMergePtr = pCsr->pBtCsr->iPtr; }else if( pCsr->pBtCsr==0 && pCsr->nPtr>0 && iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr-1) ){ SegmentPtr *pPtr = &pCsr->aPtr[iKey-CURSOR_DATA_SEGMENT]; *pCsr->pPrevMergePtr = pPtr->iPtr+pPtr->iPgPtr; } } if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){ TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]; if( bReverse ){ rc = lsmTreeCursorPrev(pTreeCsr); }else{ rc = lsmTreeCursorNext(pTreeCsr); } }else if( iKey==CURSOR_DATA_SYSTEM ){ assert( pCsr->flags & CURSOR_AT_FREELIST ); assert( pCsr->flags & CURSOR_NEW_SYSTEM ); assert( bReverse==0 ); pCsr->flags &= ~CURSOR_AT_FREELIST; }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){ assert( bReverse==0 && pCsr->pBtCsr ); rc = btreeCursorNext(pCsr->pBtCsr); }else{ rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse); } if( rc==LSM_OK ){ int i; for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){ multiCursorDoCompare(pCsr, i, bReverse); } } }while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 ); |
︙ | ︙ | |||
2457 2458 2459 2460 2461 2462 2463 | int lsmMCursorPrev(MultiCursor *pCsr){ if( (pCsr->flags & CURSOR_PREV_OK)==0 ) return LSM_MISUSE_BKPT; return multiCursorAdvance(pCsr, 1); } int lsmMCursorKey(MultiCursor *pCsr, void **ppKey, int *pnKey){ | > > > > | | | | | | | | | | | | | | | | | | | > > > > > > | | | | | | | | | | > | 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 | int lsmMCursorPrev(MultiCursor *pCsr){ if( (pCsr->flags & CURSOR_PREV_OK)==0 ) return LSM_MISUSE_BKPT; return multiCursorAdvance(pCsr, 1); } int lsmMCursorKey(MultiCursor *pCsr, void **ppKey, int *pnKey){ if( pCsr->flags & CURSOR_SEEK_EQ ){ *pnKey = pCsr->key.nData; *ppKey = pCsr->key.pData; }else{ int iKey = pCsr->aTree[1]; if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){ TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]; lsmTreeCursorKey(pTreeCsr, 0, ppKey, pnKey); }else{ int nKey; #ifndef NDEBUG void *pKey; int eType; multiCursorGetKey(pCsr, iKey, &eType, &pKey, &nKey); assert( eType==pCsr->eType ); assert( nKey==pCsr->key.nData ); assert( memcmp(pKey, pCsr->key.pData, nKey)==0 ); #endif nKey = pCsr->key.nData; if( nKey==0 ){ *ppKey = 0; }else{ *ppKey = pCsr->key.pData; } *pnKey = nKey; } } return LSM_OK; } int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){ void *pVal; int nVal; int rc; if( pCsr->flags & CURSOR_SEEK_EQ ){ rc = LSM_OK; nVal = pCsr->val.nData; pVal = pCsr->val.pData; }else{ assert( pCsr->aTree ); assert( mcursorLocationOk(pCsr, (pCsr->flags & CURSOR_IGNORE_DELETE)) ); rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal); if( pVal && rc==LSM_OK ){ rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal); pVal = pCsr->val.pData; } if( rc!=LSM_OK ){ pVal = 0; nVal = 0; } } *ppVal = pVal; *pnVal = nVal; return rc; } int lsmMCursorType(MultiCursor *pCsr, int *peType){ |
︙ | ︙ | |||
2530 2531 2532 2533 2534 2535 2536 | int nKey; int eType; nRec = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]); iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec-1)]); eType = aData[iOff++]; assert( eType==0 | | | | 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 | int nKey; int eType; nRec = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]); iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec-1)]); eType = aData[iOff++]; assert( eType==0 || eType==(LSM_SYSTEMKEY|LSM_SEPARATOR) || eType==(LSM_SEPARATOR) ); iOff += lsmVarintGet32(&aData[iOff], &nKey); iOff += lsmVarintGet32(&aData[iOff], &nKey); return iOff + (eType ? nKey : 0); } |
︙ | ︙ | |||
2856 2857 2858 2859 2860 2861 2862 | lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], nRec+1); if( bIndirect ){ aData[iOff++] = 0x00; iOff += lsmVarintPut32(&aData[iOff], iPtr); iOff += lsmVarintPut32(&aData[iOff], iKeyPg); }else{ | | | 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 | lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], nRec+1); if( bIndirect ){ aData[iOff++] = 0x00; iOff += lsmVarintPut32(&aData[iOff], iPtr); iOff += lsmVarintPut32(&aData[iOff], iKeyPg); }else{ aData[iOff++] = (u8)(iTopic | LSM_SEPARATOR); iOff += lsmVarintPut32(&aData[iOff], iPtr); iOff += lsmVarintPut32(&aData[iOff], nKey); memcpy(&aData[iOff], pKey, nKey); } if( iLevel>0 ){ int iRight = lsmFsPageNumber(p->apHier[iLevel-1]); |
︙ | ︙ | |||
3002 3003 3004 3005 3006 3007 3008 | pSeg = &pMW->pLevel->lhs; pPg = pMW->pPage; aData = fsPageData(pPg, &nData); nRec = pageGetNRec(aData, nData); iFPtr = pageGetPtr(aData, nData); | < < < < < < < | 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 | pSeg = &pMW->pLevel->lhs; pPg = pMW->pPage; aData = fsPageData(pPg, &nData); nRec = pageGetNRec(aData, nData); iFPtr = pageGetPtr(aData, nData); /* Calculate the relative pointer value to write to this record */ iRPtr = iPtr - iFPtr; /* assert( iRPtr>=0 ); */ /* Figure out how much space is required by the new record. The space ** required is divided into two sections: the header and the body. The ** header consists of the intial varint fields. The body are the blobs |
︙ | ︙ | |||
3036 3037 3038 3039 3040 3041 3042 | nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey); if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal); /* If the entire header will not fit on page pPg, or if page pPg is ** marked read-only, advance to the next page of the output run. */ iOff = pMerge->iOutputOff; if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){ | | > | 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 | nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey); if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal); /* If the entire header will not fit on page pPg, or if page pPg is ** marked read-only, advance to the next page of the output run. */ iOff = pMerge->iOutputOff; if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){ iFPtr = *pCsr->pPrevMergePtr; iRPtr = iPtr - iFPtr; iOff = 0; nRec = 0; rc = mergeWorkerNextPage(pMW, iFPtr); pPg = pMW->pPage; } } |
︙ | ︙ | |||
3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 | iFPtr = pageGetPtr(aData, nData); lsmFsPageRelease(pPg); } } if( rc==LSM_OK ){ rc = mergeWorkerNextPage(pMW, iFPtr); } return rc; } static int mergeWorkerStep(MergeWorker *pMW){ lsm_db *pDb = pMW->pDb; /* Database handle */ MultiCursor *pCsr; /* Cursor to read input data from */ int rc = LSM_OK; /* Return code */ int eType; /* SORTED_SEPARATOR, WRITE or DELETE */ void *pKey; int nKey; /* Key */ Segment *pSeg; /* Output segment */ | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | > < > > > | | | | | | | | | | | | | | | | | | | | > | 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 | iFPtr = pageGetPtr(aData, nData); lsmFsPageRelease(pPg); } } if( rc==LSM_OK ){ rc = mergeWorkerNextPage(pMW, iFPtr); if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr; } return rc; } /* ** The cursor passed as the first argument is being used as the input for ** a merge operation. When this function is called, *piFlags contains the ** database entry flags for the current entry. The entry about to be written ** to the output. ** */ static void mergeRangeDeletes(MultiCursor *pCsr, int *piFlags){ int f = *piFlags; int iKey = pCsr->aTree[1]; int i; if( pCsr->flags & CURSOR_IGNORE_DELETE ){ /* The ignore-delete flag is set when the output of the merge will form ** the oldest level in the database. In this case there is no point in ** retaining any range-delete flags. */ assert( (f & LSM_POINT_DELETE)==0 ); f &= ~(LSM_START_DELETE|LSM_END_DELETE); }else{ if( iKey==0 ){ int btreeflags = lsmTreeCursorFlags(pCsr->apTreeCsr[1]); if( btreeflags & LSM_END_DELETE ){ f |= (LSM_START_DELETE|LSM_END_DELETE); } } for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); i<pCsr->nPtr; i++){ if( pCsr->aPtr[i].eType & LSM_END_DELETE ){ f |= (LSM_START_DELETE|LSM_END_DELETE); } } if( (f & LSM_START_DELETE) && (f & LSM_END_DELETE) && (f & LSM_INSERT)==0 ){ f = 0; } } *piFlags = f; } static int mergeWorkerStep(MergeWorker *pMW){ lsm_db *pDb = pMW->pDb; /* Database handle */ MultiCursor *pCsr; /* Cursor to read input data from */ int rc = LSM_OK; /* Return code */ int eType; /* SORTED_SEPARATOR, WRITE or DELETE */ void *pKey; int nKey; /* Key */ Segment *pSeg; /* Output segment */ Pgno iPtr; pCsr = pMW->pCsr; pSeg = &pMW->pLevel->lhs; /* Pull the next record out of the source cursor. */ lsmMCursorKey(pCsr, &pKey, &nKey); eType = pCsr->eType; /* Figure out if the output record may have a different pointer value ** than the previous. This is the case if the current key is identical to ** a key that appears in the lowest level run being merged. If so, set ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, ** indicating that the output pointer value should be a copy of the pointer ** value written with the previous key. */ iPtr = (pCsr->pPrevMergePtr ? *pCsr->pPrevMergePtr : 0); if( pCsr->pBtCsr ){ BtreeCursor *pBtCsr = pCsr->pBtCsr; if( pBtCsr->pKey ){ int res = rtTopic(pBtCsr->eType) - rtTopic(eType); if( res==0 ) res = pDb->xCmp(pBtCsr->pKey, pBtCsr->nKey, pKey, nKey); if( 0==res ) iPtr = pBtCsr->iPtr; assert( res>=0 ); } }else if( pCsr->nPtr ){ SegmentPtr *pPtr = &pCsr->aPtr[pCsr->nPtr-1]; if( pPtr->pPg && 0==pDb->xCmp(pPtr->pKey, pPtr->nKey, pKey, nKey) ){ iPtr = pPtr->iPtr+pPtr->iPgPtr; } } mergeRangeDeletes(pCsr, &eType); if( eType!=0 ){ if( pMW->aGobble ){ int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT; if( iGobble<pCsr->nPtr ){ SegmentPtr *pGobble = &pCsr->aPtr[iGobble]; if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){ pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg); } } } /* If this is a separator key and we know that the output pointer has not ** changed, there is no point in writing an output record. Otherwise, ** proceed. */ if( rtIsSeparator(eType)==0 || iPtr!=0 ){ int iSPtr = 0; /* Separators require a pointer here */ if( pMW->pPage==0 ){ rc = mergeWorkerFirstPage(pMW); } /* Write the record into the main run. */ if( rc==LSM_OK ){ rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pCsr, iPtr, &iSPtr); } } } /* Advance the cursor to the next input record (assuming one exists). */ assert( lsmMCursorValid(pMW->pCsr) ); if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr); |
︙ | ︙ | |||
3341 3342 3343 3344 3345 3346 3347 | int *pnWrite /* OUT: Number of database pages written */ ){ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ Segment *pDel = 0; /* Delete separators from this segment */ | | | 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 | int *pnWrite /* OUT: Number of database pages written */ ){ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ Segment *pDel = 0; /* Delete separators from this segment */ Pgno iLeftPtr = 0; int nWrite = 0; /* Number of database pages written */ assert( pnOvfl ); /* Allocate the new level structure to write to. */ pNext = lsmDbSnapshotLevel(pDb->pWorker); pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc); |
︙ | ︙ | |||
3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 | memset(&merge, 0, sizeof(Merge)); memset(&mergeworker, 0, sizeof(MergeWorker)); pNew->pMerge = &merge; mergeworker.pDb = pDb; mergeworker.pLevel = pNew; mergeworker.pCsr = pCsr; /* Mark the separators array for the new level as a "phantom". */ mergeworker.bFlush = 1; /* Allocate the first page of the output segment. */ rc = mergeWorkerNextPage(&mergeworker, iLeftPtr); | > | 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728 3729 3730 3731 | memset(&merge, 0, sizeof(Merge)); memset(&mergeworker, 0, sizeof(MergeWorker)); pNew->pMerge = &merge; mergeworker.pDb = pDb; mergeworker.pLevel = pNew; mergeworker.pCsr = pCsr; pCsr->pPrevMergePtr = &iLeftPtr; /* Mark the separators array for the new level as a "phantom". */ mergeworker.bFlush = 1; /* Allocate the first page of the output segment. */ rc = mergeWorkerNextPage(&mergeworker, iLeftPtr); |
︙ | ︙ | |||
3413 3414 3415 3416 3417 3418 3419 | } if( rc==LSM_OK ){ sortedInvokeWorkHook(pDb); } #if 0 | | | 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 | } if( rc==LSM_OK ){ sortedInvokeWorkHook(pDb); } #if 0 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel"); #endif if( pnWrite ) *pnWrite = nWrite; pDb->pWorker->nWrite += nWrite; return rc; } |
︙ | ︙ | |||
3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 | pMW->pCsr = pCsr; /* Load the current output page into memory. */ if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW); /* Position the cursor. */ if( rc==LSM_OK ){ if( pMW->pPage==0 ){ /* The output array is still empty. So position the cursor at the very ** start of the input. */ rc = multiCursorEnd(pCsr, 0); }else{ /* The output array is non-empty. Position the cursor based on the ** page/cell data saved in the Merge.aInput[] array. */ | > | 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 | pMW->pCsr = pCsr; /* Load the current output page into memory. */ if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW); /* Position the cursor. */ if( rc==LSM_OK ){ pCsr->pPrevMergePtr = &pMerge->iCurrentPtr; if( pMW->pPage==0 ){ /* The output array is still empty. So position the cursor at the very ** start of the input. */ rc = multiCursorEnd(pCsr, 0); }else{ /* The output array is non-empty. Position the cursor based on the ** page/cell data saved in the Merge.aInput[] array. */ |
︙ | ︙ | |||
3786 3787 3788 3789 3790 3791 3792 | SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i]; if( pGobble->pSeg->iRoot ){ rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i); }else if( mergeworker.aGobble[i] ){ lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1); } } | < < < < < < < < < < < < < | 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 | SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i]; if( pGobble->pSeg->iRoot ){ rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i); }else if( mergeworker.aGobble[i] ){ lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1); } } }else if( pLevel->lhs.iFirst==0 ){ /* If the new level is completely empty, remove it from the ** database snapshot. This can only happen if all input keys were ** annihilated. Since keys are only annihilated if the new level ** is the last in the linked list (contains the most ancient of ** database content), this guarantees that pLevel->pNext==0. */ |
︙ | ︙ | |||
3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 | /* Clean up the MergeWorker object initialized above. If no error ** has occurred, invoke the work-hook to inform the application that ** the database structure has changed. */ mergeWorkerShutdown(&mergeworker, &rc); if( rc==LSM_OK ) sortedInvokeWorkHook(pDb); /* If bFlush is true and the database is no longer considered "full", ** break out of the loop even if nRemaining is still greater than ** zero. The caller has an in-memory tree to flush to disk. */ if( bFlush && sortedDbIsFull(pDb)==0 ) break; | > > > > > < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 | /* Clean up the MergeWorker object initialized above. If no error ** has occurred, invoke the work-hook to inform the application that ** the database structure has changed. */ mergeWorkerShutdown(&mergeworker, &rc); if( rc==LSM_OK ) sortedInvokeWorkHook(pDb); #if 0 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work"); #endif assertRunInOrder(pDb, &pLevel->lhs); /* If bFlush is true and the database is no longer considered "full", ** break out of the loop even if nRemaining is still greater than ** zero. The caller has an in-memory tree to flush to disk. */ if( bFlush && sortedDbIsFull(pDb)==0 ) break; } } if( pnWrite ) *pnWrite = (nWork - nRemaining); pWorker->nWrite += (nWork - nRemaining); #ifdef LSM_LOG_WORK lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining)); #endif return rc; } /* ** The database connection passed as the first argument must be a worker ** connection. This function checks if there exists an "old" in-memory tree ** ready to be flushed to disk. If so, *pbOut is set to true before ** returning. Otherwise false. ** ** Normally, LSM_OK is returned. Or, if an error occurs, an LSM error code. |
︙ | ︙ | |||
4354 4355 4356 4357 4358 4359 4360 | }else{ lsmStringAppendf(pStr, "%c", isalnum(z[iChar]) ?z[iChar] : '.'); } } return LSM_OK; } | > > > | > > > > > > > > > > | 4633 4634 4635 4636 4637 4638 4639 4640 4641 4642 4643 4644 4645 4646 4647 4648 4649 4650 4651 4652 4653 4654 4655 4656 4657 4658 4659 4660 4661 4662 4663 4664 | }else{ lsmStringAppendf(pStr, "%c", isalnum(z[iChar]) ?z[iChar] : '.'); } } return LSM_OK; } #define INFO_PAGE_DUMP_DATA 0x01 #define INFO_PAGE_DUMP_VALUES 0x02 #define INFO_PAGE_DUMP_HEX 0x04 static int infoPageDump( lsm_db *pDb, /* Database handle */ Pgno iPg, /* Page number of page to dump */ int flags, char **pzOut /* OUT: lsmMalloc'd string */ ){ int rc = LSM_OK; /* Return code */ Page *pPg = 0; /* Handle for page iPg */ int i, j; /* Loop counters */ const int perLine = 16; /* Bytes per line in the raw hex dump */ int bValues = (flags & INFO_PAGE_DUMP_VALUES); int bHex = (flags & INFO_PAGE_DUMP_HEX); int bData = (flags & INFO_PAGE_DUMP_DATA); *pzOut = 0; if( iPg==0 ) return LSM_ERROR; rc = lsmFsDbPageGet(pDb->pFS, iPg, &pPg); if( rc==LSM_OK ){ Blob blob = {0, 0, 0, 0}; |
︙ | ︙ | |||
4400 4401 4402 4403 4404 4405 4406 4407 4408 4409 4410 4411 4412 | for(iCell=0; iCell<nRec; iCell++){ u8 *aKey; int nKey = 0; /* Key */ u8 *aVal; int nVal = 0; /* Value */ int iPgPtr; int eType; char cType = '?'; Pgno iAbsPtr; infoCellDump(pDb, pPg, iCell, &eType, &iPgPtr, &aKey, &nKey, &aVal, &nVal, &blob ); iAbsPtr = iPgPtr + ((flags & SEGMENT_BTREE_FLAG) ? 0 : iPtr); | > < | < | | | > | | | | | | | | | | | | | | | | | | | | | | > > > > > > > > > > > > > > > > > > > | 4692 4693 4694 4695 4696 4697 4698 4699 4700 4701 4702 4703 4704 4705 4706 4707 4708 4709 4710 4711 4712 4713 4714 4715 4716 4717 4718 4719 4720 4721 4722 4723 4724 4725 4726 4727 4728 4729 4730 4731 4732 4733 4734 4735 4736 4737 4738 4739 4740 4741 4742 4743 4744 4745 4746 4747 4748 4749 4750 4751 4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783 4784 4785 4786 4787 4788 4789 4790 4791 | for(iCell=0; iCell<nRec; iCell++){ u8 *aKey; int nKey = 0; /* Key */ u8 *aVal; int nVal = 0; /* Value */ int iPgPtr; int eType; char cType = '?'; Pgno iAbsPtr; char zFlags[8]; infoCellDump(pDb, pPg, iCell, &eType, &iPgPtr, &aKey, &nKey, &aVal, &nVal, &blob ); iAbsPtr = iPgPtr + ((flags & SEGMENT_BTREE_FLAG) ? 0 : iPtr); lsmFlagsToString(eType, zFlags); lsmStringAppendf(&str, "%s %d (%s) ", zFlags, iAbsPtr, (rtTopic(eType) ? "sys" : "usr") ); infoAppendBlob(&str, bHex, aKey, nKey); if( nVal>0 && bValues ){ lsmStringAppendf(&str, "%*s", nKeyWidth - (nKey*(1+bHex)), ""); lsmStringAppendf(&str, " "); infoAppendBlob(&str, bHex, aVal, nVal); } lsmStringAppendf(&str, "\n"); } if( bData ){ lsmStringAppendf(&str, "\n-------------------" "-------------------------------------------------------------\n"); lsmStringAppendf(&str, "Page %d\n", iPg, (iPg-1)*nData, iPg*nData - 1); for(i=0; i<nData; i += perLine){ lsmStringAppendf(&str, "%04x: ", i); for(j=0; j<perLine; j++){ if( i+j>nData ){ lsmStringAppendf(&str, " "); }else{ lsmStringAppendf(&str, "%02x ", aData[i+j]); } } lsmStringAppendf(&str, " "); for(j=0; j<perLine; j++){ if( i+j>nData ){ lsmStringAppendf(&str, " "); }else{ lsmStringAppendf(&str,"%c", isprint(aData[i+j]) ? aData[i+j] : '.'); } } lsmStringAppendf(&str,"\n"); } } *pzOut = str.z; sortedBlobFree(&blob); lsmFsPageRelease(pPg); } return rc; } int lsmInfoPageDump( lsm_db *pDb, /* Database handle */ Pgno iPg, /* Page number of page to dump */ int bHex, /* True to output key/value in hex form */ char **pzOut /* OUT: lsmMalloc'd string */ ){ int flags = INFO_PAGE_DUMP_DATA | INFO_PAGE_DUMP_VALUES; if( bHex ) flags |= INFO_PAGE_DUMP_HEX; return infoPageDump(pDb, iPg, flags, pzOut); } void sortedDumpSegment(lsm_db *pDb, Segment *pRun, int bVals){ assert( pDb->xLog ); if( pRun && pRun->iFirst ){ int flags = (bVals ? INFO_PAGE_DUMP_VALUES : 0); char *zSeg; Page *pPg; zSeg = segToString(pDb->pEnv, pRun, 0); lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg); lsmFree(pDb->pEnv, zSeg); lsmFsDbPageGet(pDb->pFS, pRun->iFirst, &pPg); while( pPg ){ Page *pNext; char *z = 0; infoPageDump(pDb, lsmFsPageNumber(pPg), flags, &z); lsmLogMessage(pDb, LSM_OK, "%s", z); lsmFree(pDb->pEnv, z); #if 0 sortedDumpPage(pDb, pRun, pPg, bVals); #endif lsmFsDbPageNext(pRun, pPg, 1, &pNext); lsmFsPageRelease(pPg); pPg = pNext; } } } |
︙ | ︙ |
Changes to src/lsm_tree.c.
︙ | ︙ | |||
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | struct TreeOld { u32 iShmid; /* Last shared-memory chunk in use by old */ u32 iRoot; /* Offset of root node in shm file */ u32 nHeight; /* Height of tree structure */ }; /* ** Container for a key-value pair. Within the *-shm file, each key/value ** pair is stored in a single allocation (which may not actually be ** contiguous in memory). Layout is the TreeKey structure, followed by ** the nKey bytes of key blob, followed by the nValue bytes of value blob ** (if nValue is non-negative). */ struct TreeKey { int nKey; /* Size of pKey in bytes */ int nValue; /* Size of pValue. Or negative. */ }; #define TK_KEY(p) ((void *)&(p)[1]) #define TK_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey)) /* ** A single tree node. A node structure may contain up to 3 key/value | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | struct TreeOld { u32 iShmid; /* Last shared-memory chunk in use by old */ u32 iRoot; /* Offset of root node in shm file */ u32 nHeight; /* Height of tree structure */ }; #ifndef NDEBUG /* ** assert() that a TreeKey.flags value is sane. Usage: ** ** assert( assertFlagsOk(pTreeKey->flags) ); */ static int assertFlagsOk(u8 keyflags){ /* At least one flag must be set. Otherwise, what is this key doing? */ assert( keyflags!=0 ); /* The POINT_DELETE and INSERT flags cannot both be set. */ assert( (keyflags & LSM_POINT_DELETE)==0 || (keyflags & LSM_INSERT)==0 ); /* If both the START_DELETE and END_DELETE flags are set, then the INSERT ** flag must also be set. In other words - the three DELETE flags cannot ** all be set */ assert( (keyflags & LSM_END_DELETE)==0 || (keyflags & LSM_START_DELETE)==0 || (keyflags & LSM_POINT_DELETE)==0 ); return 1; } static int assert_delete_ranges_match(lsm_db *); static int treeCountEntries(lsm_db *db); #else # define assertFlagsOk(x) #endif /* ** Container for a key-value pair. Within the *-shm file, each key/value ** pair is stored in a single allocation (which may not actually be ** contiguous in memory). Layout is the TreeKey structure, followed by ** the nKey bytes of key blob, followed by the nValue bytes of value blob ** (if nValue is non-negative). */ struct TreeKey { int nKey; /* Size of pKey in bytes */ int nValue; /* Size of pValue. Or negative. */ u8 flags; /* Various LSM_XXX flags */ }; #define TK_KEY(p) ((void *)&(p)[1]) #define TK_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey)) /* ** A single tree node. A node structure may contain up to 3 key/value |
︙ | ︙ | |||
281 282 283 284 285 286 287 288 289 290 291 292 293 294 | return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy); } static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){ return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, pRc); } /* Values for the third argument to treeShmkey(). */ #define TK_LOADKEY 1 #define TK_LOADVAL 2 static TreeKey *treeShmkey( lsm_db *pDb, /* Database handle */ u32 iPtr, /* Shmptr to TreeKey struct */ | > > > > > > > > > > > > > > > > > > | 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 | return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy); } static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){ return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, pRc); } #ifndef NDEBUG static void assertIsWorkingChild( lsm_db *db, TreeNode *pNode, TreeNode *pParent, int iCell ){ TreeNode *p; int rc = LSM_OK; u32 iPtr = getChildPtr(pParent, WORKING_VERSION, iCell); p = treeShmptr(db, iPtr, &rc); assert( p==pNode || rc!=LSM_OK ); } #else # define assertIsWorkingChild(w,x,y,z) #endif /* Values for the third argument to treeShmkey(). */ #define TK_LOADKEY 1 #define TK_LOADVAL 2 static TreeKey *treeShmkey( lsm_db *pDb, /* Database handle */ u32 iPtr, /* Shmptr to TreeKey struct */ |
︙ | ︙ | |||
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 | ** * todo... */ void assert_tree_looks_ok(int rc, Tree *pTree){ } #else # define assert_tree_looks_ok(x,y) #endif #ifdef LSM_DEBUG /* ** Pointer pBlob points to a buffer containing a blob of binary data ** nBlob bytes long. Append the contents of this blob to *pStr, with ** each octet represented by a 2-digit hexadecimal number. For example, ** if the input blob is three bytes in size and contains {0x01, 0x44, 0xFF}, ** then "0144ff" is appended to *pStr. */ static void lsmAppendStrBlob(LsmString *pStr, void *pBlob, int nBlob){ int i; lsmStringExtend(pStr, nBlob*2); if( pStr->nAlloc==0 ) return; for(i=0; i<nBlob; i++){ u8 c = ((u8*)pBlob)[i]; if( c>='a' && c<='z' ){ pStr->z[pStr->n++] = c; | > > > > > > > > > > > > > > > > > > > | > > > > > > > > > | | > > > > > | | < < < | | | | | > | | | | | > > | | | | > > > | | > > > > > > > > > > > > | | 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 | ** * todo... */ void assert_tree_looks_ok(int rc, Tree *pTree){ } #else # define assert_tree_looks_ok(x,y) #endif void lsmFlagsToString(int flags, char *zFlags){ zFlags[0] = (flags & LSM_END_DELETE) ? ']' : '.'; /* Only one of LSM_POINT_DELETE, LSM_INSERT and LSM_SEPARATOR should ever ** be set. If this is not true, write a '?' to the output. */ switch( flags & (LSM_POINT_DELETE|LSM_INSERT|LSM_SEPARATOR) ){ case 0: zFlags[1] = '.'; break; case LSM_POINT_DELETE: zFlags[1] = '-'; break; case LSM_INSERT: zFlags[1] = '+'; break; case LSM_SEPARATOR: zFlags[1] = '^'; break; default: zFlags[1] = '?'; break; } zFlags[2] = (flags & LSM_SYSTEMKEY) ? '*' : '.'; zFlags[3] = (flags & LSM_START_DELETE) ? '[' : '.'; zFlags[4] = '\0'; } #ifdef LSM_DEBUG /* ** Pointer pBlob points to a buffer containing a blob of binary data ** nBlob bytes long. Append the contents of this blob to *pStr, with ** each octet represented by a 2-digit hexadecimal number. For example, ** if the input blob is three bytes in size and contains {0x01, 0x44, 0xFF}, ** then "0144ff" is appended to *pStr. */ static void lsmAppendStrBlob(LsmString *pStr, void *pBlob, int nBlob){ int i; lsmStringExtend(pStr, nBlob*2); if( pStr->nAlloc==0 ) return; for(i=0; i<nBlob; i++){ u8 c = ((u8*)pBlob)[i]; if( c>='a' && c<='z' ){ pStr->z[pStr->n++] = c; }else if( c!=0 || nBlob==1 || i!=(nBlob-1) ){ pStr->z[pStr->n++] = "0123456789abcdef"[(c>>4)&0xf]; pStr->z[pStr->n++] = "0123456789abcdef"[c&0xf]; } } pStr->z[pStr->n] = 0; } /* ** Append nIndent space (0x20) characters to string *pStr. */ static void lsmAppendIndent(LsmString *pStr, int nIndent){ int i; lsmStringExtend(pStr, nIndent); for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1); } static void strAppendFlags(LsmString *pStr, u8 flags){ char zFlags[8]; lsmFlagsToString(flags, zFlags); zFlags[4] = ':'; lsmStringAppend(pStr, zFlags, 5); } void dump_node_contents( lsm_db *pDb, u32 iNode, /* Print out the contents of this node */ char *zPath, /* Path from root to this node */ int nPath, /* Number of bytes in zPath */ int nHeight /* Height: (0==leaf) (1==parent-of-leaf) */ ){ const char *zSpace = " "; int i; int rc = LSM_OK; LsmString s; TreeNode *pNode; TreeBlob b = {0, 0}; pNode = (TreeNode *)treeShmptr(pDb, iNode, &rc); if( nHeight==0 ){ /* Append the nIndent bytes of space to string s. */ lsmStringInit(&s, pDb->pEnv); /* Append each key to string s. */ for(i=0; i<3; i++){ u32 iPtr = pNode->aiKeyPtr[i]; if( iPtr ){ TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TK_LOADKEY, &b,&rc); strAppendFlags(&s, pKey->flags); lsmAppendStrBlob(&s, TK_KEY(pKey), pKey->nKey); lsmStringAppend(&s, " ", -1); } } printf("% 6d %.*sleaf%.*s: %s\n", iNode, nPath, zPath, 20-nPath-4, zSpace, s.z ); lsmStringClear(&s); }else{ for(i=0; i<4 && nHeight>0; i++){ u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i); zPath[nPath] = i+'0'; zPath[nPath+1] = '/'; if( iPtr ){ dump_node_contents(pDb, iPtr, zPath, nPath+2, nHeight-1); } if( i!=3 && pNode->aiKeyPtr[i] ){ TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TK_LOADKEY, &b,&rc); lsmStringInit(&s, pDb->pEnv); strAppendFlags(&s, pKey->flags); lsmAppendStrBlob(&s, TK_KEY(pKey), pKey->nKey); printf("% 6d %.*s%.*s: %s\n", iNode, nPath+1, zPath, 20-nPath-1, zSpace, s.z); lsmStringClear(&s); } } } tblobFree(pDb, &b); } void dump_tree_contents(lsm_db *pDb, const char *zCaption){ char zPath[64]; TreeRoot *p = &pDb->treehdr.root; printf("\n%s\n", zCaption); zPath[0] = '/'; if( p->iRoot ){ dump_node_contents(pDb, p->iRoot, zPath, 1, p->nHeight-1); } fflush(stdout); } #endif /* |
︙ | ︙ | |||
469 470 471 472 473 474 475 | } /* ** Return a pointer to the mapping of the TreeKey object that the cursor ** is pointing to. */ static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){ | | > > | 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 | } /* ** Return a pointer to the mapping of the TreeKey object that the cursor ** is pointing to. */ static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){ TreeKey *pRet = (TreeKey *)treeShmkey(pCsr->pDb, pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]], TK_LOADVAL, pBlob, pRc ); assert( pRet==0 || assertFlagsOk(pRet->flags) ); return pRet; } /* ** Save the current position of tree cursor pCsr. */ int lsmTreeCursorSave(TreeCursor *pCsr){ int rc = LSM_OK; |
︙ | ︙ | |||
1074 1075 1076 1077 1078 1079 1080 | u32 iShmid; ShmChunk *p; p = treeShmChunkRc(db, db->treehdr.iFirst, &rc); iShmid = p->iShmid; while( rc==LSM_OK && p ){ if( p->iNext ){ | > > > | | | | | | > | 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 | u32 iShmid; ShmChunk *p; p = treeShmChunkRc(db, db->treehdr.iFirst, &rc); iShmid = p->iShmid; while( rc==LSM_OK && p ){ if( p->iNext ){ if( p->iNext>=db->treehdr.nChunk ){ rc = LSM_CORRUPT_BKPT; }else{ ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc); if( rc==LSM_OK ){ if( pNext->iShmid!=p->iShmid+1 ){ rc = LSM_CORRUPT_BKPT; } p = pNext; } } }else{ p = 0; } nVisit++; } |
︙ | ︙ | |||
1163 1164 1165 1166 1167 1168 1169 | nSort = 1; while( nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2; nByte = sizeof(ShmChunkLoc) * nSort * 2; aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc); iPrevShmid = pMin->iShmid; /* Fix all shm-ids, if required. */ | | > | | > | 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 | nSort = 1; while( nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2; nByte = sizeof(ShmChunkLoc) * nSort * 2; aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc); iPrevShmid = pMin->iShmid; /* Fix all shm-ids, if required. */ if( rc==LSM_OK ){ iPrevShmid = pMin->iShmid-1; for(i=1; i<db->treehdr.nChunk; i++){ p = treeShmChunk(db, i); aSort[i-1].pShm = p; aSort[i-1].iLoc = i; if( i!=db->treehdr.iFirst ){ if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){ p->iShmid = iPrevShmid--; } } } if( iMin!=db->treehdr.iFirst ){ p = treeShmChunk(db, db->treehdr.iFirst); p->iShmid = iPrevShmid; } } if( rc==LSM_OK ){ ShmChunkLoc *aSpace = &aSort[nSort]; for(i=0; i<nSort; i++){ if( aSort[i].pShm ){ assert( shm_sequence_ge(aSort[i].pShm->iShmid, iPrevShmid) ); |
︙ | ︙ | |||
1242 1243 1244 1245 1246 1247 1248 | rc = treeRepairList(db); } memcpy(&db->treehdr, &hdr, sizeof(TreeHeader)); return rc; } | > > > > > > > | > > > > > > | > > > | > > > > > > > | > > > > | > > | > > > > > > > > > > > | > > > > > > > > > > > > > > > > > > > > > > > > > > | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > < < < < < < < < < < < < < < < < < < < < < < < < < < | < < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 | rc = treeRepairList(db); } memcpy(&db->treehdr, &hdr, sizeof(TreeHeader)); return rc; } static void treeOverwriteKey(lsm_db *db, TreeCursor *pCsr, u32 iKey, int *pRc){ if( *pRc==LSM_OK ){ TreeRoot *p = &db->treehdr.root; TreeNode *pNew; u32 iNew; TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode]; int iCell = pCsr->aiCell[pCsr->iNode]; /* Create a copy of this node */ if( (pCsr->iNode>0 && pCsr->iNode==(p->nHeight-1)) ){ pNew = copyTreeLeaf(db, (TreeLeaf *)pNode, &iNew, pRc); }else{ pNew = copyTreeNode(db, pNode, &iNew, pRc); } if( pNew ){ /* Modify the value in the new version */ pNew->aiKeyPtr[iCell] = iKey; /* Change the pointer in the parent (if any) to point at the new ** TreeNode */ pCsr->iNode--; treeUpdatePtr(db, pCsr, iNew); } } } static int treeNextIsEndDelete(lsm_db *db, TreeCursor *pCsr){ TreeNode *pNode; int iNode = pCsr->iNode; int iCell = pCsr->aiCell[iNode]+1; /* Cursor currently points to a leaf node. */ assert( pCsr->iNode==(db->treehdr.root.nHeight-1) ); while( iNode>=0 ){ TreeNode *pNode = pCsr->apTreeNode[iNode]; if( iCell<3 && pNode->aiKeyPtr[iCell] ){ int rc = LSM_OK; TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell], &rc); assert( rc==LSM_OK ); return ((pKey->flags & LSM_END_DELETE) ? 1 : 0); } iNode--; iCell = pCsr->aiCell[iNode]; } return 0; } static int treePrevIsStartDelete(lsm_db *db, TreeCursor *pCsr){ TreeNode *pNode; int iNode = pCsr->iNode; /* Cursor currently points to a leaf node. */ assert( pCsr->iNode==(db->treehdr.root.nHeight-1) ); while( iNode>=0 ){ TreeNode *pNode = pCsr->apTreeNode[iNode]; int iCell = pCsr->aiCell[iNode]-1; if( iCell>=0 && pNode->aiKeyPtr[iCell] ){ int rc = LSM_OK; TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell], &rc); assert( rc==LSM_OK ); return ((pKey->flags & LSM_START_DELETE) ? 1 : 0); } iNode--; } return 0; } static int treeInsertEntry( lsm_db *pDb, /* Database handle */ int flags, /* Flags associated with entry */ void *pKey, /* Pointer to key data */ int nKey, /* Size of key data in bytes */ void *pVal, /* Pointer to value data (or NULL) */ int nVal /* Bytes in value data (or -ve for delete) */ ){ int rc = LSM_OK; /* Return Code */ TreeKey *pTreeKey; /* New key-value being inserted */ u32 iTreeKey; TreeRoot *p = &pDb->treehdr.root; TreeCursor csr; /* Cursor to seek to pKey/nKey */ int res; /* Result of seek operation on csr */ assert( nVal>=0 || pVal==0 ); assert_tree_looks_ok(LSM_OK, pTree); assert( flags==LSM_INSERT || flags==LSM_POINT_DELETE || flags==LSM_START_DELETE || flags==LSM_END_DELETE ); #if 0 dump_tree_contents(pDb, "before"); #endif if( p->iRoot ){ TreeKey *pRes; /* Key at end of seek operation */ treeCursorInit(pDb, 0, &csr); /* Seek to the leaf (or internal node) that the new key belongs on */ rc = lsmTreeCursorSeek(&csr, pKey, nKey, &res); pRes = csrGetKey(&csr, &csr.blob, &rc); if( rc!=LSM_OK ) return rc; if( flags==LSM_START_DELETE ){ /* When inserting a start-delete-range entry, if the key that ** occurs immediately before the new entry is already a START_DELETE, ** then the new entry is not required. */ if( (res<=0 && (pRes->flags & LSM_START_DELETE)) || (res>0 && treePrevIsStartDelete(pDb, &csr)) ){ goto insert_entry_out; } }else if( flags==LSM_END_DELETE ){ /* When inserting an start-delete-range entry, if the key that ** occurs immediately after the new entry is already an END_DELETE, ** then the new entry is not required. */ if( (res<0 && treeNextIsEndDelete(pDb, &csr)) || (res>=0 && (pRes->flags & LSM_END_DELETE)) ){ goto insert_entry_out; } } if( res==0 && (flags & (LSM_END_DELETE|LSM_START_DELETE)) ){ if( pRes->flags & LSM_INSERT ){ nVal = pRes->nValue; pVal = TK_VAL(pRes); } flags = flags | pRes->flags; } if( flags & (LSM_INSERT|LSM_POINT_DELETE) ){ if( (res<0 && (pRes->flags & LSM_START_DELETE)) || (res>0 && (pRes->flags & LSM_END_DELETE)) ){ flags = flags | (LSM_END_DELETE|LSM_START_DELETE); }else if( res==0 ){ flags = flags | (pRes->flags & (LSM_END_DELETE|LSM_START_DELETE)); } } }else{ memset(&csr, 0, sizeof(TreeCursor)); } /* Allocate and populate a new key-value pair structure */ pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc); if( rc!=LSM_OK ) return rc; pTreeKey->flags = flags; if( p->iRoot==0 ){ /* The tree is completely empty. Add a new root node and install ** (pKey/nKey) as the middle entry. Even though it is a leaf at the ** moment, use newTreeNode() to allocate the node (i.e. allocate enough ** space for the fields used by interior nodes). This is because the ** treeInsert() routine may convert this node to an interior node. */ TreeNode *pRoot = newTreeNode(pDb, &p->iRoot, &rc); if( rc==LSM_OK ){ assert( p->nHeight==0 ); pRoot->aiKeyPtr[1] = iTreeKey; p->nHeight = 1; } }else{ if( res==0 ){ /* The search found a match within the tree. */ treeOverwriteKey(pDb, &csr, iTreeKey, &rc); }else{ /* The cursor now points to the leaf node into which the new entry should ** be inserted. There may or may not be a free slot within the leaf for ** the new key-value pair. ** ** iSlot is set to the index of the key within pLeaf that the new key ** should be inserted to the left of (or to a value 1 greater than the ** index of the rightmost key if the new key is larger than all keys ** currently stored in the node). */ int iSlot = csr.aiCell[csr.iNode] + (res<0); if( csr.iNode==0 ){ rc = treeInsert(pDb, &csr, 0, iTreeKey, 0, iSlot); }else{ rc = treeInsertLeaf(pDb, &csr, iTreeKey, iSlot); } } } #if 0 dump_tree_contents(pDb, "after"); #endif insert_entry_out: tblobFree(pDb, &csr.blob); assert_tree_looks_ok(rc, pTree); return rc; } /* ** Insert a new entry into the in-memory tree. ** ** If the value of the 5th parameter, nVal, is negative, then a delete-marker ** is inserted into the tree. In this case the value pointer, pVal, must be ** NULL. */ int lsmTreeInsert( lsm_db *pDb, /* Database handle */ void *pKey, /* Pointer to key data */ int nKey, /* Size of key data in bytes */ void *pVal, /* Pointer to value data (or NULL) */ int nVal /* Bytes in value data (or -ve for delete) */ ){ int flags; if( nVal<0 ){ flags = LSM_POINT_DELETE; }else{ flags = LSM_INSERT; } return treeInsertEntry(pDb, flags, pKey, nKey, pVal, nVal); } static int treeDeleteEntry(lsm_db *db, TreeCursor *pCsr, u32 iNewptr){ TreeRoot *p = &db->treehdr.root; TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode]; int iSlot = pCsr->aiCell[pCsr->iNode]; int bLeaf; int rc = LSM_OK; assert( pNode->aiKeyPtr[1] ); assert( pNode->aiKeyPtr[iSlot] ); assert( iSlot==0 || iSlot==1 || iSlot==2 ); assert( (pCsr->iNode==(db->treehdr.root.nHeight-1))==(iNewptr==0) ); bLeaf = (pCsr->iNode==(p->nHeight-1) && p->nHeight>1); if( pNode->aiKeyPtr[0] || pNode->aiKeyPtr[2] ){ /* There are currently at least 2 keys on this node. So just create ** a new copy of the node with one of the keys removed. If the node ** happens to be the root node of the tree, allocate an entire ** TreeNode structure instead of just a TreeLeaf. */ TreeNode *pNew; u32 iNew; if( bLeaf ){ pNew = (TreeNode *)newTreeLeaf(db, &iNew, &rc); }else{ pNew = newTreeNode(db, &iNew, &rc); } if( pNew ){ int i; int iOut = 1; for(i=0; i<4; i++){ if( i==iSlot ){ i++; if( bLeaf==0 ) pNew->aiChildPtr[iOut] = iNewptr; if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i]; iOut++; }else if( bLeaf || p->nHeight==1 ){ if( i<3 && pNode->aiKeyPtr[i] ){ pNew->aiKeyPtr[iOut++] = pNode->aiKeyPtr[i]; } }else{ if( getChildPtr(pNode, WORKING_VERSION, i) ){ pNew->aiChildPtr[iOut] = getChildPtr(pNode, WORKING_VERSION, i); if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i]; iOut++; } } } assert( iOut<=4 ); assert( bLeaf || pNew->aiChildPtr[0]==0 ); pCsr->iNode--; rc = treeUpdatePtr(db, pCsr, iNew); } }else if( pCsr->iNode==0 ){ /* Removing the only key in the root node. iNewptr is the new root. */ assert( iSlot==1 ); db->treehdr.root.iRoot = iNewptr; db->treehdr.root.nHeight--; }else{ /* There is only one key on this node and the node is not the root ** node. Find a peer for this node. Then redistribute the contents of ** the peer and the parent cell between the parent and either one or ** two new nodes. */ TreeNode *pParent; /* Parent tree node */ int iPSlot; u32 iPeer; /* Pointer to peer leaf node */ int iDir; TreeNode *pPeer; /* The peer leaf node */ TreeNode *pNew1; u32 iNew1; /* First new leaf node */ assert( iSlot==1 ); pParent = pCsr->apTreeNode[pCsr->iNode-1]; iPSlot = pCsr->aiCell[pCsr->iNode-1]; if( iPSlot>0 && getChildPtr(pParent, WORKING_VERSION, iPSlot-1) ){ iDir = -1; }else{ iDir = +1; } iPeer = getChildPtr(pParent, WORKING_VERSION, iPSlot+iDir); pPeer = (TreeNode *)treeShmptr(db, iPeer, &rc); assertIsWorkingChild(db, pNode, pParent, iPSlot); /* Allocate the first new leaf node. This is always required. */ if( bLeaf ){ pNew1 = (TreeNode *)newTreeLeaf(db, &iNew1, &rc); }else{ pNew1 = (TreeNode *)newTreeNode(db, &iNew1, &rc); } if( pPeer->aiKeyPtr[0] && pPeer->aiKeyPtr[2] ){ /* Peer node is completely full. This means that two new leaf nodes ** and a new parent node are required. */ TreeNode *pNew2; u32 iNew2; /* Second new leaf node */ TreeNode *pNewP; u32 iNewP; /* New parent node */ if( bLeaf ){ pNew2 = (TreeNode *)newTreeLeaf(db, &iNew2, &rc); }else{ pNew2 = (TreeNode *)newTreeNode(db, &iNew2, &rc); } pNewP = copyTreeNode(db, pParent, &iNewP, &rc); if( iDir==-1 ){ pNew1->aiKeyPtr[1] = pPeer->aiKeyPtr[0]; if( bLeaf==0 ){ pNew1->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 0); pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 1); } pNewP->aiChildPtr[iPSlot-1] = iNew1; pNewP->aiKeyPtr[iPSlot-1] = pPeer->aiKeyPtr[1]; pNewP->aiChildPtr[iPSlot] = iNew2; pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[2]; pNew2->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot-1]; if( bLeaf==0 ){ pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 2); pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 3); pNew2->aiChildPtr[2] = iNewptr; } }else{ pNew1->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot]; if( bLeaf==0 ){ pNew1->aiChildPtr[1] = iNewptr; pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 0); } pNewP->aiChildPtr[iPSlot] = iNew1; pNewP->aiKeyPtr[iPSlot] = pPeer->aiKeyPtr[0]; pNewP->aiChildPtr[iPSlot+1] = iNew2; pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[1]; pNew2->aiKeyPtr[1] = pPeer->aiKeyPtr[2]; if( bLeaf==0 ){ pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 1); pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 2); pNew2->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 3); } } assert( pCsr->iNode>=1 ); pCsr->iNode -= 2; if( rc==LSM_OK ){ assert( pNew1->aiKeyPtr[1] && pNew2->aiKeyPtr[1] ); rc = treeUpdatePtr(db, pCsr, iNewP); } }else{ int iKOut = 0; int iPOut = 0; int i; pCsr->iNode--; if( iDir==1 ){ pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot]; if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr; } for(i=0; i<3; i++){ if( pPeer->aiKeyPtr[i] ){ pNew1->aiKeyPtr[iKOut++] = pPeer->aiKeyPtr[i]; } } if( bLeaf==0 ){ for(i=0; i<4; i++){ if( getChildPtr(pPeer, WORKING_VERSION, i) ){ pNew1->aiChildPtr[iPOut++] = getChildPtr(pPeer, WORKING_VERSION, i); } } } if( iDir==-1 ){ iPSlot--; pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot]; if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr; pCsr->aiCell[pCsr->iNode] = iPSlot; } rc = treeDeleteEntry(db, pCsr, iNew1); } } return rc; } /* ** Delete a range of keys from the tree structure (i.e. the lsm_delete_range() ** function, not lsm_delete()). ** ** This is a two step process: ** ** 1) Remove all entries currently stored in the tree that have keys ** that fall into the deleted range. ** ** TODO: There are surely good ways to optimize this step - removing ** a range of keys from a b-tree. But for now, this function removes ** them one at a time using the usual approach. ** ** 2) Unless the largest key smaller than or equal to (pKey1/nKey1) is ** already marked as START_DELETE, insert a START_DELETE key. ** Similarly, unless the smallest key greater than or equal to ** (pKey2/nKey2) is already START_END, insert a START_END key. */ int lsmTreeDelete( lsm_db *db, void *pKey1, int nKey1, /* Start of range */ void *pKey2, int nKey2 /* End of range */ ){ int rc = LSM_OK; int bDone = 0; TreeRoot *p = &db->treehdr.root; TreeBlob blob = {0, 0}; /* The range must be sensible - that (key1 < key2). */ assert( db->xCmp(pKey1, nKey1, pKey2, nKey2)<0 ); assert( assert_delete_ranges_match(db) ); #if 0 static int nCall = 0; printf("\n"); nCall++; printf("%d delete %s .. %s\n", nCall, (char *)pKey1, (char *)pKey2); dump_tree_contents(db, "before delete"); #endif /* Step 1. This loop runs until the tree contains no keys within the ** range being deleted. Or until an error occurs. */ while( bDone==0 && rc==LSM_OK ){ int res; TreeCursor csr; /* Cursor to seek to first key in range */ void *pDel; int nDel; /* Key to (possibly) delete this iteration */ #ifndef NDEBUG int nEntry = treeCountEntries(db); #endif /* Seek the cursor to the first entry in the tree greater than pKey1. */ treeCursorInit(db, 0, &csr); lsmTreeCursorSeek(&csr, pKey1, nKey1, &res); if( res<=0 && lsmTreeCursorValid(&csr) ) lsmTreeCursorNext(&csr); /* If there is no such entry, or if it is greater than pKey2, then the ** tree now contains no keys in the range being deleted. In this case ** break out of the loop. */ bDone = 1; if( lsmTreeCursorValid(&csr) ){ lsmTreeCursorKey(&csr, 0, &pDel, &nDel); if( db->xCmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0; } if( bDone==0 ){ if( csr.iNode==(p->nHeight-1) ){ /* The element to delete already lies on a leaf node */ rc = treeDeleteEntry(db, &csr, 0); }else{ /* 1. Overwrite the current key with a copy of the next key in the ** tree (key N). ** ** 2. Seek to key N (cursor will stop at the internal node copy of ** N). Move to the next key (original copy of N). Delete ** this entry. */ u32 iKey; TreeKey *pKey; int iNode = csr.iNode; lsmTreeCursorNext(&csr); assert( csr.iNode==(p->nHeight-1) ); iKey = csr.apTreeNode[csr.iNode]->aiKeyPtr[csr.aiCell[csr.iNode]]; lsmTreeCursorPrev(&csr); treeOverwriteKey(db, &csr, iKey, &rc); pKey = treeShmkey(db, iKey, TK_LOADKEY, &blob, &rc); if( pKey ){ rc = lsmTreeCursorSeek(&csr, TK_KEY(pKey), pKey->nKey, &res); } if( rc==LSM_OK ){ assert( res==0 && csr.iNode==iNode ); rc = lsmTreeCursorNext(&csr); if( rc==LSM_OK ){ rc = treeDeleteEntry(db, &csr, 0); } } } } /* Clean up any memory allocated by the cursor. */ tblobFree(db, &csr.blob); #if 0 dump_tree_contents(db, "ddd delete"); #endif assert( bDone || treeCountEntries(db)==(nEntry-1) ); } #if 0 dump_tree_contents(db, "during delete"); #endif /* Now insert the START_DELETE and END_DELETE keys. */ if( rc==LSM_OK ){ rc = treeInsertEntry(db, LSM_START_DELETE, pKey1, nKey1, 0, -1); } #if 0 dump_tree_contents(db, "during delete 2"); #endif if( rc==LSM_OK ){ rc = treeInsertEntry(db, LSM_END_DELETE, pKey2, nKey2, 0, -1); } #if 0 dump_tree_contents(db, "after delete"); #endif tblobFree(db, &blob); assert( assert_delete_ranges_match(db) ); return rc; } /* ** Return, in bytes, the amount of memory currently used by the tree ** structure. */ int lsmTreeSize(lsm_db *pDb){ return pDb->treehdr.nByte; |
︙ | ︙ | |||
1547 1548 1549 1550 1551 1552 1553 | if( iCell<3 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break; } } #ifndef NDEBUG if( pCsr->iNode>=0 ){ TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc); | | | 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 | if( iCell<3 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break; } } #ifndef NDEBUG if( pCsr->iNode>=0 ){ TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc); assert( rc || pDb->xCmp(TK_KEY(pK2),pK2->nKey,TK_KEY(pK1),pK1->nKey)>=0 ); } tblobFree(pDb, &key1); #endif return rc; } |
︙ | ︙ | |||
1629 1630 1631 1632 1633 1634 1635 | /* ** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the ** in-memory tree. */ int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){ lsm_db *pDb = pCsr->pDb; | < | 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 | /* ** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the ** in-memory tree. */ int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){ lsm_db *pDb = pCsr->pDb; TreeRoot *pRoot = pCsr->pRoot; int rc = LSM_OK; u32 iNodePtr; pCsr->iNode = -1; /* Discard any saved position data */ |
︙ | ︙ | |||
1666 1667 1668 1669 1670 1671 1672 | } pCsr->aiCell[pCsr->iNode] = iCell - (iNodePtr==0 && bLast); } return rc; } | > > > > > > > > > > > > > | > > | < > | 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 | } pCsr->aiCell[pCsr->iNode] = iCell - (iNodePtr==0 && bLast); } return rc; } int lsmTreeCursorFlags(TreeCursor *pCsr){ int flags = 0; if( pCsr && pCsr->iNode>=0 ){ int rc = LSM_OK; TreeKey *pKey = (TreeKey *)treeShmptr(pCsr->pDb, pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]], &rc ); assert( rc==LSM_OK ); flags = pKey->flags; } return flags; } int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey){ TreeKey *pTreeKey; int rc = LSM_OK; assert( lsmTreeCursorValid(pCsr) ); pTreeKey = pCsr->pSave; if( !pTreeKey ){ pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc); } if( rc==LSM_OK ){ *pnKey = pTreeKey->nKey; if( pFlags ) *pFlags = pTreeKey->flags; *ppKey = (void *)&pTreeKey[1]; } return rc; } int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal){ int res = 0; int rc; rc = treeCursorRestore(pCsr, &res); if( res==0 ){ TreeKey *pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc); if( rc==LSM_OK ){ if( pTreeKey->flags & LSM_INSERT ){ *pnVal = pTreeKey->nValue; *ppVal = TK_VAL(pTreeKey); }else{ *ppVal = 0; *pnVal = -1; } } }else{ *ppVal = 0; *pnVal = 0; } |
︙ | ︙ | |||
1838 1839 1840 1841 1842 1843 1844 1845 | memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader)); } pShm->bWriter = 0; intArrayFree(pDb->pEnv, &pDb->rollback); return LSM_OK; } | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 | memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader)); } pShm->bWriter = 0; intArrayFree(pDb->pEnv, &pDb->rollback); return LSM_OK; } #ifndef NDEBUG static int assert_delete_ranges_match(lsm_db *db){ int prev = 0; TreeBlob blob = {0, 0}; TreeCursor csr; /* Cursor used to iterate through tree */ int rc; treeCursorInit(db, 0, &csr); for( rc = lsmTreeCursorEnd(&csr, 0); rc==LSM_OK && lsmTreeCursorValid(&csr); rc = lsmTreeCursorNext(&csr) ){ TreeKey *pKey = csrGetKey(&csr, &blob, &rc); if( rc!=LSM_OK ) break; assert( ((prev&LSM_START_DELETE)==0)==((pKey->flags&LSM_END_DELETE)==0) ); prev = pKey->flags; } tblobFree(csr.pDb, &csr.blob); return 1; } static int treeCountEntries(lsm_db *db){ TreeCursor csr; /* Cursor used to iterate through tree */ int rc; int nEntry = 0; treeCursorInit(db, 0, &csr); for( rc = lsmTreeCursorEnd(&csr, 0); rc==LSM_OK && lsmTreeCursorValid(&csr); rc = lsmTreeCursorNext(&csr) ){ nEntry++; } tblobFree(csr.pDb, &csr.blob); return nEntry; } #endif |