Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Improvements to comments in the multi-threaded sorter. Also include a function name change for clarity. And add a test to help show that the MergeEngine object is only used by a single thread. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | threads |
Files: | files | file ages | folders |
SHA1: |
9af50a878f67c1c2a4f1520160cc9896 |
User & Date: | drh 2014-07-28 14:54:50.442 |
Context
2014-07-28
| ||
15:01 | Merge recent trunk changes into the threads branch. (check-in: 163c247bd8 user: drh tags: threads) | |
14:54 | Improvements to comments in the multi-threaded sorter. Also include a function name change for clarity. And add a test to help show that the MergeEngine object is only used by a single thread. (check-in: 9af50a878f user: drh tags: threads) | |
2014-07-24
| ||
16:54 | Merge all recent trunk changes into the threads branch. (check-in: 770685892c user: drh tags: threads) | |
Changes
Changes to src/vdbesort.c.
︙ | ︙ | |||
81 82 83 84 85 86 87 | ** A PMA created at this point is known as a "level-0 PMA". Higher levels ** of PMAs may be created by merging existing PMAs together - for example ** merging two or more level-0 PMAs together creates a level-1 PMA. ** ** The threshold for the amount of main memory to use before flushing ** records to a PMA is roughly the same as the limit configured for the ** page-cache of the main database. Specifically, the threshold is set to | | | 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | ** A PMA created at this point is known as a "level-0 PMA". Higher levels ** of PMAs may be created by merging existing PMAs together - for example ** merging two or more level-0 PMAs together creates a level-1 PMA. ** ** The threshold for the amount of main memory to use before flushing ** records to a PMA is roughly the same as the limit configured for the ** page-cache of the main database. Specifically, the threshold is set to ** the value returned by "PRAGMA main.page_size" multipled by ** that returned by "PRAGMA main.cache_size", in bytes. ** ** If the sorter is running in single-threaded mode, then all PMAs generated ** are appended to a single temporary file. Or, if the sorter is running in ** multi-threaded mode then up to (N+1) temporary files may be opened, where ** N is the configured number of worker threads. In this case, instead of ** sorting the records and writing the PMA to a temporary file itself, the |
︙ | ︙ | |||
186 187 188 189 190 191 192 193 194 195 196 197 198 199 | ** The MergeEngine object is used to combine two or more smaller PMAs into ** one big PMA using a merge operation. Separate PMAs all need to be ** combined into one big PMA in order to be able to step through the sorted ** records in order. ** ** The aReadr[] array contains a PmaReader object for each of the PMAs being ** merged. An aReadr[] object either points to a valid key or else is at EOF. ** For the purposes of the paragraphs below, we assume that the array is ** actually N elements in size, where N is the smallest power of 2 greater ** to or equal to the number of PMAs being merged. The extra aReadr[] elements ** are treated as if they are empty (always at EOF). ** ** The aTree[] array is also N elements in size. The value of N is stored in ** the MergeEngine.nTree variable. | > | 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | ** The MergeEngine object is used to combine two or more smaller PMAs into ** one big PMA using a merge operation. Separate PMAs all need to be ** combined into one big PMA in order to be able to step through the sorted ** records in order. ** ** The aReadr[] array contains a PmaReader object for each of the PMAs being ** merged. An aReadr[] object either points to a valid key or else is at EOF. ** ("EOF" means "End Of File". When aReadr[] is at EOF there is no more data.) ** For the purposes of the paragraphs below, we assume that the array is ** actually N elements in size, where N is the smallest power of 2 greater ** to or equal to the number of PMAs being merged. The extra aReadr[] elements ** are treated as if they are empty (always at EOF). ** ** The aTree[] array is also N elements in size. The value of N is stored in ** the MergeEngine.nTree variable. |
︙ | ︙ | |||
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 | ** ** In other words, each time we advance to the next sorter element, log2(N) ** key comparison operations are required, where N is the number of segments ** being merged (rounded up to the next power of 2). */ struct MergeEngine { int nTree; /* Used size of aTree/aReadr (power of 2) */ int *aTree; /* Current state of incremental merge */ PmaReader *aReadr; /* Array of PmaReaders to merge data from */ }; /* ** Exactly VdbeSorter.nTask instances of this object are allocated ** as part of each VdbeSorter object. Instances are never allocated any ** other way. VdbeSorter.nTask is set to the number of worker threads allowed | > > | > > | 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 | ** ** In other words, each time we advance to the next sorter element, log2(N) ** key comparison operations are required, where N is the number of segments ** being merged (rounded up to the next power of 2). */ struct MergeEngine { int nTree; /* Used size of aTree/aReadr (power of 2) */ SortSubtask *pTask; /* Used by this thread only */ int *aTree; /* Current state of incremental merge */ PmaReader *aReadr; /* Array of PmaReaders to merge data from */ }; /* ** This object represents a single thread of control in a sort operation. ** Exactly VdbeSorter.nTask instances of this object are allocated ** as part of each VdbeSorter object. Instances are never allocated any ** other way. VdbeSorter.nTask is set to the number of worker threads allowed ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). Thus for ** single-threaded operation, there is exactly one instance of this object ** and for multi-threaded operation there are two or more instances. ** ** Essentially, this structure contains all those fields of the VdbeSorter ** structure for which each thread requires a separate instance. For example, ** each thread requries its own UnpackedRecord object to unpack records in ** as part of comparison operations. ** ** Before a background thread is launched, variable bDone is set to 0. Then, |
︙ | ︙ | |||
439 440 441 442 443 444 445 | /* Maximum number of PMAs that a single MergeEngine can merge */ #define SORTER_MAX_MERGE_COUNT 16 static int vdbeIncrSwap(IncrMerger*); static void vdbeIncrFree(IncrMerger *); /* | | | | | 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 | /* Maximum number of PMAs that a single MergeEngine can merge */ #define SORTER_MAX_MERGE_COUNT 16 static int vdbeIncrSwap(IncrMerger*); static void vdbeIncrFree(IncrMerger *); /* ** Free all memory belonging to the PmaReader object passed as the ** argument. All structure fields are set to zero before returning. */ static void vdbePmaReaderClear(PmaReader *pReadr){ sqlite3_free(pReadr->aAlloc); sqlite3_free(pReadr->aBuffer); if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFile, 0, pReadr->aMap); vdbeIncrFree(pReadr->pIncr); memset(pReadr, 0, sizeof(PmaReader)); } /* ** Read the next nByte bytes of data from the PMA p. ** If successful, set *ppOut to point to a buffer containing the data ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite ** error code. ** ** The buffer returned in *ppOut is only valid until the ** next call to this function. */ static int vdbePmaReadBlob( PmaReader *p, /* PmaReader from which to take the blob */ int nByte, /* Bytes of data to read */ u8 **ppOut /* OUT: Pointer to buffer containing data */ ){ |
︙ | ︙ | |||
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 | ** Or, if an error occurs, return an SQLite error code. The final value of ** *pp is undefined in this case. */ static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){ int rc = SQLITE_OK; if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){ rc = sqlite3OsFetch(pFile->pFd, 0, (int)pFile->iEof, (void**)pp); } return rc; } /* ** Seek PmaReader pReadr to offset iOff within file pFile. Return SQLITE_OK ** if successful, or an SQLite error code if an error occurs. */ static int vdbePmaReaderSeek( SortSubtask *pTask, /* Task context */ | > | | 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 | ** Or, if an error occurs, return an SQLite error code. The final value of ** *pp is undefined in this case. */ static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){ int rc = SQLITE_OK; if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){ rc = sqlite3OsFetch(pFile->pFd, 0, (int)pFile->iEof, (void**)pp); testcase( rc!=SQLITE_OK ); } return rc; } /* ** Seek PmaReader pReadr to offset iOff within file pFile. Return SQLITE_OK ** if successful, or an SQLite error code if an error occurs. */ static int vdbePmaReaderSeek( SortSubtask *pTask, /* Task context */ PmaReader *pReadr, /* Reader whose cursor is to be moved */ SorterFile *pFile, /* Sorter file to read from */ i64 iOff /* Offset in pFile */ ){ int rc = SQLITE_OK; assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 ); |
︙ | ︙ | |||
633 634 635 636 637 638 639 640 641 642 643 644 645 646 | int nRead = pgsz - iBuf; if( (pReadr->iReadOff + nRead) > pReadr->iEof ){ nRead = (int)(pReadr->iEof - pReadr->iReadOff); } rc = sqlite3OsRead( pReadr->pFile, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff ); } } return rc; } /* | > | 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 | int nRead = pgsz - iBuf; if( (pReadr->iReadOff + nRead) > pReadr->iEof ){ nRead = (int)(pReadr->iEof - pReadr->iReadOff); } rc = sqlite3OsRead( pReadr->pFile, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff ); testcase( rc!=SQLITE_OK ); } } return rc; } /* |
︙ | ︙ | |||
664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 | bEof = 0; } } if( bEof ){ /* This is an EOF condition */ vdbePmaReaderClear(pReadr); return rc; } } if( rc==SQLITE_OK ){ rc = vdbePmaReadVarint(pReadr, &nRec); } if( rc==SQLITE_OK ){ pReadr->nKey = (int)nRec; rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey); } return rc; } /* ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile | > > | 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 | bEof = 0; } } if( bEof ){ /* This is an EOF condition */ vdbePmaReaderClear(pReadr); testcase( rc!=SQLITE_OK ); return rc; } } if( rc==SQLITE_OK ){ rc = vdbePmaReadVarint(pReadr, &nRec); } if( rc==SQLITE_OK ){ pReadr->nKey = (int)nRec; rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey); testcase( rc!=SQLITE_OK ); } return rc; } /* ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile |
︙ | ︙ | |||
1022 1023 1024 1025 1026 1027 1028 | } #else # define vdbeSorterJoinAll(x,rcin) (rcin) # define vdbeSorterJoinThread(pTask) SQLITE_OK #endif /* | | > > > > > | 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 | } #else # define vdbeSorterJoinAll(x,rcin) (rcin) # define vdbeSorterJoinThread(pTask) SQLITE_OK #endif /* ** Allocate a new MergeEngine object capable of handling up to ** nReader PmaReader inputs. ** ** nReader is automatically rounded up to the next power of two. ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up. */ static MergeEngine *vdbeMergeEngineNew(int nReader){ int N = 2; /* Smallest power of two >= nReader */ int nByte; /* Total bytes of space to allocate */ MergeEngine *pNew; /* Pointer to allocated object to return */ assert( nReader<=SORTER_MAX_MERGE_COUNT ); while( N<nReader ) N += N; nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader)); pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte); if( pNew ){ pNew->nTree = N; pNew->pTask = 0; pNew->aReadr = (PmaReader*)&pNew[1]; pNew->aTree = (int*)&pNew->aReadr[N]; } return pNew; } /* |
︙ | ︙ | |||
1434 1435 1436 1437 1438 1439 1440 | vdbeSorterWorkDebug(pTask, "exit"); assert( rc!=SQLITE_OK || pList->pList==0 ); assert( rc!=SQLITE_OK || pTask->file.iEof==iSz ); return rc; } /* | | | | | | | | > > > > | 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 | vdbeSorterWorkDebug(pTask, "exit"); assert( rc!=SQLITE_OK || pList->pList==0 ); assert( rc!=SQLITE_OK || pTask->file.iEof==iSz ); return rc; } /* ** Advance the MergeEngine pMerge (passed as the second argument) to ** its next entry. Set *pbEof to true there is no next entry because ** the MergeEngine has reached the end of all its inputs. ** ** Return SQLITE_OK if successful or an error code if an error occurs. */ static int vdbeMergeEngineStep( SortSubtask *pTask, /* The thread in which this MergeEngine runs */ MergeEngine *pMerger, /* The merge engine to advance to the next row */ int *pbEof /* Set TRUE at EOF. Set false for more content */ ){ int rc; int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */ /* A MergeEngine object is only used by a single thread */ assert( pMerger->pTask==0 || pMerger->pTask==pTask ); pMerger->pTask = pTask; /* Advance the current PmaReader */ rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]); /* Update contents of aTree[] */ if( rc==SQLITE_OK ){ int i; /* Index of aTree[] to recalculate */ |
︙ | ︙ | |||
1716 1717 1718 1719 1720 1721 1722 | ** In either case exit the loop. */ if( pReader->pFile==0 ) break; if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break; /* Write the next key to the output. */ vdbePmaWriteVarint(&writer, nKey); vdbePmaWriteBlob(&writer, pReader->aKey, nKey); | | | 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 | ** In either case exit the loop. */ if( pReader->pFile==0 ) break; if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break; /* Write the next key to the output. */ vdbePmaWriteVarint(&writer, nKey); vdbePmaWriteBlob(&writer, pReader->aKey, nKey); rc = vdbeMergeEngineStep(pTask, pIncr->pMerger, &dummy); } rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof); if( rc==SQLITE_OK ) rc = rc2; vdbeSorterPopulateDebug(pTask, "exit"); return rc; } |
︙ | ︙ | |||
2124 2125 2126 2127 2128 2129 2130 | ** can be used to incrementally merge all PMAs on disk. ** ** If successful, SQLITE_OK is returned and *ppOut set to point to the ** MergeEngine object at the root of the tree before returning. Or, if an ** error occurs, an SQLite error code is returned and the final value ** of *ppOut is undefined. */ | | > > > | 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 | ** can be used to incrementally merge all PMAs on disk. ** ** If successful, SQLITE_OK is returned and *ppOut set to point to the ** MergeEngine object at the root of the tree before returning. Or, if an ** error occurs, an SQLite error code is returned and the final value ** of *ppOut is undefined. */ static int vdbeSorterMergeTreeBuild( VdbeSorter *pSorter, /* The VDBE cursor that implements the sort */ MergeEngine **ppOut /* Write the MergeEngine here */ ){ MergeEngine *pMain = 0; int rc = SQLITE_OK; int iTask; #if SQLITE_MAX_WORKER_THREADS>0 /* If the sorter uses more than one task, then create the top-level ** MergeEngine here. This MergeEngine will read data from exactly |
︙ | ︙ | |||
2332 2333 2334 2335 2336 2337 2338 | #if SQLITE_MAX_WORKER_THREADS>0 if( pSorter->bUseThreads ){ rc = vdbePmaReaderNext(pSorter->pReader); *pbEof = (pSorter->pReader->pFile==0); }else #endif /*if( !pSorter->bUseThreads )*/ { | | | 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 | #if SQLITE_MAX_WORKER_THREADS>0 if( pSorter->bUseThreads ){ rc = vdbePmaReaderNext(pSorter->pReader); *pbEof = (pSorter->pReader->pFile==0); }else #endif /*if( !pSorter->bUseThreads )*/ { rc = vdbeMergeEngineStep(&pSorter->aTask[0], pSorter->pMerger, pbEof); } }else{ SorterRecord *pFree = pSorter->list.pList; pSorter->list.pList = pFree->u.pNext; pFree->u.pNext = 0; if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree); *pbEof = !pSorter->list.pList; |
︙ | ︙ |