Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Change the name of the SorterThread object to "SortSubtask" to avoid confusion with the SQLiteThread object. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | threads |
Files: | files | file ages | folders |
SHA1: |
4ee2d910fbbed8d4def15e4e99ee2258 |
User & Date: | drh 2014-04-02 14:38:14.734 |
Context
2014-04-02
| ||
15:15 | Fix some problems with OOM handling in vdbesort.c. (check-in: 47e702bd83 user: dan tags: threads) | |
14:38 | Change the name of the SorterThread object to "SortSubtask" to avoid confusion with the SQLiteThread object. (check-in: 4ee2d910fb user: drh tags: threads) | |
2014-04-01
| ||
18:41 | When sorting data for a CREATE INDEX statement in single-threaded mode, assume that keys are delivered to the sorter in primary key order. Also fix various comments that had fallen out of date. (check-in: 821d1ac450 user: dan tags: threads) | |
Changes
Changes to src/vdbesort.c.
︙ | ︙ | |||
16 17 18 19 20 21 22 | */ #include "sqliteInt.h" #include "vdbeInt.h" typedef struct VdbeSorterIter VdbeSorterIter; | | | | | | | | | | | | | | | | | | | | | | | 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | */ #include "sqliteInt.h" #include "vdbeInt.h" typedef struct VdbeSorterIter VdbeSorterIter; typedef struct SortSubtask SortSubtask; typedef struct SorterRecord SorterRecord; typedef struct SorterMerger SorterMerger; typedef struct FileWriter FileWriter; /* ** Candidate values for SortSubtask.eWork */ #define SORTER_THREAD_SORT 1 /* Sort records on pList */ #define SORTER_THREAD_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pFile */ #define SORTER_THREAD_CONS 3 /* Consolidate multiple PMAs */ /* ** Much of the work performed in this module to sort the list of records is ** broken down into smaller units that may be peformed in parallel. In order ** to perform such a unit of work, an instance of the following structure ** is configured and passed to vdbeSortSubtaskMain() - either directly by ** the main thread or via a background thread. ** ** Exactly SortSubtask.nThread instances of this structure are allocated ** as part of each VdbeSorter object. Instances are never allocated any other ** way. SortSubtask.nThread is set to the number of worker threads allowed ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). ** ** When a background thread is launched to perform work, SortSubtask.bDone ** is set to 0 and the SortSubtask.pThread variable set to point to the ** thread handle. SortSubtask.bDone is set to 1 (to indicate to the main ** thread that joining SortSubtask.pThread will not block) before the thread ** exits. SortSubtask.pThread and bDone are always cleared after the ** background thread has been joined. ** ** One object (specifically, VdbeSorter.aThread[SortSubtask.nThread-1]) ** is reserved for the foreground thread. ** ** The nature of the work performed is determined by SortSubtask.eWork, ** as follows: ** ** SORTER_THREAD_SORT: ** Sort the linked list of records at SortSubtask.pList. ** ** SORTER_THREAD_TO_PMA: ** Sort the linked list of records at SortSubtask.pList, and write ** the results to a new PMA in temp file SortSubtask.pTemp1. Open ** the temp file if it is not already open. ** ** SORTER_THREAD_CONS: ** Merge existing PMAs until SortSubtask.nConsolidate or fewer ** remain in temp file SortSubtask.pTemp1. */ struct SortSubtask { SQLiteThread *pThread; /* Thread handle, or NULL */ int bDone; /* Set to true by pThread when finished */ sqlite3_vfs *pVfs; /* VFS used to open temporary files */ KeyInfo *pKeyInfo; /* How to compare records */ UnpackedRecord *pUnpacked; /* Space to unpack a record */ int pgsz; /* Main database page size */ |
︙ | ︙ | |||
178 179 180 181 182 183 184 | SorterRecord *pRecord; /* Head of in-memory record list */ SorterMerger *pMerger; /* For final merge of PMAs (by caller) */ u8 *aMemory; /* Block of memory to alloc records from */ int iMemory; /* Offset of first free byte in aMemory */ int nMemory; /* Size of aMemory allocation in bytes */ int iPrev; /* Previous thread used to flush PMA */ int nThread; /* Size of aThread[] array */ | | | 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | SorterRecord *pRecord; /* Head of in-memory record list */ SorterMerger *pMerger; /* For final merge of PMAs (by caller) */ u8 *aMemory; /* Block of memory to alloc records from */ int iMemory; /* Offset of first free byte in aMemory */ int nMemory; /* Size of aMemory allocation in bytes */ int iPrev; /* Previous thread used to flush PMA */ int nThread; /* Size of aThread[] array */ SortSubtask aThread[1]; }; /* ** The following type is an iterator for a PMA. It caches the current key in ** variables nKey/aKey. If the iterator is at EOF, pFile==0. */ struct VdbeSorterIter { |
︙ | ︙ | |||
423 424 425 426 427 428 429 | /* ** Initialize iterator pIter to scan through the PMA stored in file pFile ** starting at offset iStart and ending at offset iEof-1. This function ** leaves the iterator pointing to the first key in the PMA (or EOF if the ** PMA is empty). */ static int vdbeSorterIterInit( | | | 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 | /* ** Initialize iterator pIter to scan through the PMA stored in file pFile ** starting at offset iStart and ending at offset iEof-1. This function ** leaves the iterator pointing to the first key in the PMA (or EOF if the ** PMA is empty). */ static int vdbeSorterIterInit( SortSubtask *pThread, /* Thread context */ i64 iStart, /* Start offset in pThread->pTemp1 */ VdbeSorterIter *pIter, /* Iterator to populate */ i64 *pnByte /* IN/OUT: Increment this value by PMA size */ ){ int rc = SQLITE_OK; int nBuf = pThread->pgsz; void *pMap = 0; /* Mapping of temp file */ |
︙ | ︙ | |||
497 498 499 500 501 502 503 | ** is assumed that the (pThread->pUnpacked) structure already contains the ** unpacked key to use as key2. ** ** If an OOM error is encountered, (pThread->pUnpacked->error_rc) is set ** to SQLITE_NOMEM. */ static int vdbeSorterCompare( | | | | 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 | ** is assumed that the (pThread->pUnpacked) structure already contains the ** unpacked key to use as key2. ** ** If an OOM error is encountered, (pThread->pUnpacked->error_rc) is set ** to SQLITE_NOMEM. */ static int vdbeSorterCompare( SortSubtask *pThread, /* Thread context (for pKeyInfo) */ const void *pKey1, int nKey1, /* Left side of comparison */ const void *pKey2, int nKey2 /* Right side of comparison */ ){ UnpackedRecord *r2 = pThread->pUnpacked; if( pKey2 ){ sqlite3VdbeRecordUnpack(pThread->pKeyInfo, nKey2, pKey2, r2); } return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0); } /* ** This function is called to compare two iterator keys when merging ** multiple b-tree segments. Parameter iOut is the index of the aTree[] ** value to recalculate. */ static int vdbeSorterDoCompare( SortSubtask *pThread, SorterMerger *pMerger, int iOut ){ int i1; int i2; int iRes; VdbeSorterIter *p1; |
︙ | ︙ | |||
543 544 545 546 547 548 549 | if( p1->pFile==0 ){ iRes = i2; }else if( p2->pFile==0 ){ iRes = i1; }else{ int res; | | | 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 | if( p1->pFile==0 ){ iRes = i2; }else if( p2->pFile==0 ){ iRes = i1; }else{ int res; assert( pThread->pUnpacked!=0 ); /* allocated in vdbeSortSubtaskMain() */ res = vdbeSorterCompare( pThread, p1->aKey, p1->nKey, p2->aKey, p2->nKey ); if( res<=0 ){ iRes = i1; }else{ iRes = i2; |
︙ | ︙ | |||
574 575 576 577 578 579 580 | int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ int sz; /* Size of pSorter in bytes */ int rc = SQLITE_OK; int nWorker = (sqlite3GlobalConfig.bCoreMutex?sqlite3GlobalConfig.nWorker:0); assert( pCsr->pKeyInfo && pCsr->pBt==0 ); szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*); | | | | 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 | int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ int sz; /* Size of pSorter in bytes */ int rc = SQLITE_OK; int nWorker = (sqlite3GlobalConfig.bCoreMutex?sqlite3GlobalConfig.nWorker:0); assert( pCsr->pKeyInfo && pCsr->pBt==0 ); szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*); sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask); pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo); pCsr->pSorter = pSorter; if( pSorter==0 ){ rc = SQLITE_NOMEM; }else{ pKeyInfo = (KeyInfo*)((u8*)pSorter + sz); memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); pKeyInfo->db = 0; if( nField && nWorker==0 ) pKeyInfo->nField = nField; pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); pSorter->nThread = nWorker + 1; for(i=0; i<pSorter->nThread; i++){ SortSubtask *pThread = &pSorter->aThread[i]; pThread->pKeyInfo = pKeyInfo; pThread->pVfs = db->pVfs; pThread->pgsz = pgsz; } if( !sqlite3TempInMemory(db) ){ pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz; |
︙ | ︙ | |||
632 633 634 635 636 637 638 | } } /* ** Free all resources owned by the object indicated by argument pThread. All ** fields of *pThread are zeroed before returning. */ | | | 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 | } } /* ** Free all resources owned by the object indicated by argument pThread. All ** fields of *pThread are zeroed before returning. */ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pThread){ sqlite3DbFree(db, pThread->pUnpacked); pThread->pUnpacked = 0; if( pThread->aListMemory==0 ){ vdbeSorterRecordFree(0, pThread->pList); }else{ sqlite3_free(pThread->aListMemory); pThread->aListMemory = 0; |
︙ | ︙ | |||
656 657 658 659 660 661 662 | ** Join all threads. */ #if SQLITE_MAX_WORKER_THREADS>0 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ int rc = rcin; int i; for(i=0; i<pSorter->nThread; i++){ | | | 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 | ** Join all threads. */ #if SQLITE_MAX_WORKER_THREADS>0 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ int rc = rcin; int i; for(i=0; i<pSorter->nThread; i++){ SortSubtask *pThread = &pSorter->aThread[i]; if( pThread->pThread ){ void *pRet; int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet); pThread->pThread = 0; pThread->bDone = 0; if( rc==SQLITE_OK ) rc = rc2; if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); |
︙ | ︙ | |||
721 722 723 724 725 726 727 | /* ** Reset a sorting cursor back to its original empty state. */ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ int i; vdbeSorterJoinAll(pSorter, SQLITE_OK); for(i=0; i<pSorter->nThread; i++){ | | | | 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 | /* ** Reset a sorting cursor back to its original empty state. */ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ int i; vdbeSorterJoinAll(pSorter, SQLITE_OK); for(i=0; i<pSorter->nThread; i++){ SortSubtask *pThread = &pSorter->aThread[i]; vdbeSortSubtaskCleanup(db, pThread); } if( pSorter->aMemory==0 ){ vdbeSorterRecordFree(0, pSorter->pRecord); } vdbeSorterMergerReset(pSorter->pMerger); pSorter->pRecord = 0; pSorter->nInMemory = 0; |
︙ | ︙ | |||
772 773 774 775 776 777 778 | } /* ** Merge the two sorted lists p1 and p2 into a single list. ** Set *ppOut to the head of the new list. */ static void vdbeSorterMerge( | | | 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 | } /* ** Merge the two sorted lists p1 and p2 into a single list. ** Set *ppOut to the head of the new list. */ static void vdbeSorterMerge( SortSubtask *pThread, /* Calling thread context */ SorterRecord *p1, /* First list to merge */ SorterRecord *p2, /* Second list to merge */ SorterRecord **ppOut /* OUT: Head of merged list */ ){ SorterRecord *pFinal = 0; SorterRecord **pp = &pFinal; void *pVal2 = p2 ? SRVAL(p2) : 0; |
︙ | ︙ | |||
806 807 808 809 810 811 812 | } /* ** Sort the linked list of records headed at pThread->pList. Return ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if ** an error occurs. */ | | | 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 | } /* ** Sort the linked list of records headed at pThread->pList. Return ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if ** an error occurs. */ static int vdbeSorterSort(SortSubtask *pThread){ int i; SorterRecord **aSlot; SorterRecord *p; aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *)); if( !aSlot ){ return SQLITE_NOMEM; |
︙ | ︙ | |||
970 971 972 973 974 975 976 | ** * A varint. This varint contains the total number of bytes of content ** in the PMA (not including the varint itself). ** ** * One or more records packed end-to-end in order of ascending keys. ** Each record consists of a varint followed by a blob of data (the ** key). The varint is the number of bytes in the blob of data. */ | | | 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 | ** * A varint. This varint contains the total number of bytes of content ** in the PMA (not including the varint itself). ** ** * One or more records packed end-to-end in order of ascending keys. ** Each record consists of a varint followed by a blob of data (the ** key). The varint is the number of bytes in the blob of data. */ static int vdbeSorterListToPMA(SortSubtask *pThread){ int rc = SQLITE_OK; /* Return code */ FileWriter writer; /* Object used to write to the file */ memset(&writer, 0, sizeof(FileWriter)); assert( pThread->nInMemory>0 ); /* If the first temporary PMA file has not been opened, open it now. */ |
︙ | ︙ | |||
1021 1022 1023 1024 1025 1026 1027 | ** Advance the SorterMerger iterator passed as the second argument to ** the next entry. Set *pbEof to true if this means the iterator has ** reached EOF. ** ** Return SQLITE_OK if successful or an error code if an error occurs. */ static int vdbeSorterNext( | | | 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 | ** Advance the SorterMerger iterator passed as the second argument to ** the next entry. Set *pbEof to true if this means the iterator has ** reached EOF. ** ** Return SQLITE_OK if successful or an error code if an error occurs. */ static int vdbeSorterNext( SortSubtask *pThread, SorterMerger *pMerger, int *pbEof ){ int rc; int iPrev = pMerger->aTree[1];/* Index of iterator to advance */ /* Advance the current iterator */ |
︙ | ︙ | |||
1091 1092 1093 1094 1095 1096 1097 | return rc; } /* ** The main routine for sorter-thread operations. */ | | | | 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 | return rc; } /* ** The main routine for sorter-thread operations. */ static void *vdbeSortSubtaskMain(void *pCtx){ int rc = SQLITE_OK; SortSubtask *pThread = (SortSubtask*)pCtx; assert( pThread->eWork==SORTER_THREAD_SORT || pThread->eWork==SORTER_THREAD_TO_PMA || pThread->eWork==SORTER_THREAD_CONS ); assert( pThread->bDone==0 ); |
︙ | ︙ | |||
1212 1213 1214 1215 1216 1217 1218 | return SQLITE_INT_TO_PTR(rc); } /* ** Run the activity scheduled by the object passed as the only argument ** in the current thread. */ | | | | | 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 | return SQLITE_INT_TO_PTR(rc); } /* ** Run the activity scheduled by the object passed as the only argument ** in the current thread. */ static int vdbeSorterRunThread(SortSubtask *pThread){ int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pThread) ); assert( pThread->bDone ); pThread->bDone = 0; return rc; } /* ** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly ** using a background thread. ** ** If argument bFg is non-zero, the operation always uses the calling thread. */ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; int i; SortSubtask *pThread = 0; /* Thread context used to create new PMA */ int nWorker = (pSorter->nThread-1); pSorter->bUsePMA = 1; for(i=0; i<nWorker; i++){ int iTest = (pSorter->iPrev + i + 1) % nWorker; pThread = &pSorter->aThread[iTest]; #if SQLITE_MAX_WORKER_THREADS>0 |
︙ | ︙ | |||
1283 1284 1285 1286 1287 1288 1289 | if( pSorter->aMemory==0 ){ pSorter->aMemory = sqlite3Malloc(pSorter->nMemory); if( pSorter->aMemory==0 ) return SQLITE_NOMEM; }else{ pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory); } } | | | 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 | if( pSorter->aMemory==0 ){ pSorter->aMemory = sqlite3Malloc(pSorter->nMemory); if( pSorter->aMemory==0 ) return SQLITE_NOMEM; }else{ pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory); } } rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSortSubtaskMain, pCtx); }else #endif { /* Use the foreground thread for this operation */ u8 *aMem; rc = vdbeSorterRunThread(pThread); aMem = pThread->aListMemory; |
︙ | ︙ | |||
1418 1419 1420 1421 1422 1423 1424 | assert( pSorter ); /* If no data has been written to disk, then do not do so now. Instead, ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly ** from the in-memory list. */ if( pSorter->bUsePMA==0 ){ if( pSorter->pRecord ){ | | | 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 | assert( pSorter ); /* If no data has been written to disk, then do not do so now. Instead, ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly ** from the in-memory list. */ if( pSorter->bUsePMA==0 ){ if( pSorter->pRecord ){ SortSubtask *pThread = &pSorter->aThread[0]; *pbEof = 0; pThread->pList = pSorter->pRecord; pThread->eWork = SORTER_THREAD_SORT; assert( pThread->aListMemory==0 ); pThread->aListMemory = pSorter->aMemory; rc = vdbeSorterRunThread(pThread); pThread->aListMemory = 0; |
︙ | ︙ | |||
1447 1448 1449 1450 1451 1452 1453 | rc = vdbeSorterJoinAll(pSorter, rc); /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge ** some of them together so that this is no longer the case. */ if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ int i; for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){ | | | | 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 | rc = vdbeSorterJoinAll(pSorter, rc); /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge ** some of them together so that this is no longer the case. */ if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ int i; for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){ SortSubtask *pThread = &pSorter->aThread[i]; if( pThread->pTemp1 ){ pThread->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nThread; pThread->eWork = SORTER_THREAD_CONS; #if SQLITE_MAX_WORKER_THREADS>0 if( i<(pSorter->nThread-1) ){ void *pCtx = (void*)pThread; rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSortSubtaskMain,pCtx); }else #endif { rc = vdbeSorterRunThread(pThread); } } } |
︙ | ︙ | |||
1488 1489 1490 1491 1492 1493 1494 | rc = SQLITE_NOMEM; }else{ int iIter = 0; int iThread = 0; for(iThread=0; iThread<pSorter->nThread; iThread++){ int iPMA; i64 iReadOff = 0; | | | 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 | rc = SQLITE_NOMEM; }else{ int iIter = 0; int iThread = 0; for(iThread=0; iThread<pSorter->nThread; iThread++){ int iPMA; i64 iReadOff = 0; SortSubtask *pThread = &pSorter->aThread[iThread]; for(iPMA=0; iPMA<pThread->nPMA && rc==SQLITE_OK; iPMA++){ i64 nDummy = 0; VdbeSorterIter *pIter = &pMerger->aIter[iIter++]; rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nDummy); iReadOff = pIter->iEof; } } |
︙ | ︙ |