/ Check-in [9af50a87]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Improvements to comments in the multi-threaded sorter. Also include a function name change for clarity. And add a test to help show that the MergeEngine object is only used by a single thread.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | threads
Files: files | file ages | folders
SHA1:9af50a878f67c1c2a4f1520160cc989650d7196a
User & Date: drh 2014-07-28 14:54:50
Context
2014-07-28
15:01
Merge recent trunk changes into the threads branch. check-in: 163c247b user: drh tags: threads
14:54
Improvements to comments in the multi-threaded sorter. Also include a function name change for clarity. And add a test to help show that the MergeEngine object is only used by a single thread. check-in: 9af50a87 user: drh tags: threads
2014-07-24
16:54
Merge all recent trunk changes into the threads branch. check-in: 77068589 user: drh tags: threads
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/vdbesort.c.

    81     81   ** A PMA created at this point is known as a "level-0 PMA". Higher levels
    82     82   ** of PMAs may be created by merging existing PMAs together - for example
    83     83   ** merging two or more level-0 PMAs together creates a level-1 PMA.
    84     84   **
    85     85   ** The threshold for the amount of main memory to use before flushing 
    86     86   ** records to a PMA is roughly the same as the limit configured for the
    87     87   ** page-cache of the main database. Specifically, the threshold is set to 
    88         -** the value returned multiplied by "PRAGMA main.page_size" multipled by 
           88  +** the value returned by "PRAGMA main.page_size" multipled by 
    89     89   ** that returned by "PRAGMA main.cache_size", in bytes.
    90     90   **
    91     91   ** If the sorter is running in single-threaded mode, then all PMAs generated
    92     92   ** are appended to a single temporary file. Or, if the sorter is running in
    93     93   ** multi-threaded mode then up to (N+1) temporary files may be opened, where
    94     94   ** N is the configured number of worker threads. In this case, instead of
    95     95   ** sorting the records and writing the PMA to a temporary file itself, the
................................................................................
   186    186   ** The MergeEngine object is used to combine two or more smaller PMAs into
   187    187   ** one big PMA using a merge operation.  Separate PMAs all need to be
   188    188   ** combined into one big PMA in order to be able to step through the sorted
   189    189   ** records in order.
   190    190   **
   191    191   ** The aReadr[] array contains a PmaReader object for each of the PMAs being
   192    192   ** merged.  An aReadr[] object either points to a valid key or else is at EOF.
          193  +** ("EOF" means "End Of File".  When aReadr[] is at EOF there is no more data.)
   193    194   ** For the purposes of the paragraphs below, we assume that the array is
   194    195   ** actually N elements in size, where N is the smallest power of 2 greater
   195    196   ** to or equal to the number of PMAs being merged. The extra aReadr[] elements
   196    197   ** are treated as if they are empty (always at EOF).
   197    198   **
   198    199   ** The aTree[] array is also N elements in size. The value of N is stored in
   199    200   ** the MergeEngine.nTree variable.
................................................................................
   243    244   **
   244    245   ** In other words, each time we advance to the next sorter element, log2(N)
   245    246   ** key comparison operations are required, where N is the number of segments
   246    247   ** being merged (rounded up to the next power of 2).
   247    248   */
   248    249   struct MergeEngine {
   249    250     int nTree;                 /* Used size of aTree/aReadr (power of 2) */
          251  +  SortSubtask *pTask;        /* Used by this thread only */
   250    252     int *aTree;                /* Current state of incremental merge */
   251    253     PmaReader *aReadr;         /* Array of PmaReaders to merge data from */
   252    254   };
   253    255   
   254    256   /*
          257  +** This object represents a single thread of control in a sort operation.
   255    258   ** Exactly VdbeSorter.nTask instances of this object are allocated
   256    259   ** as part of each VdbeSorter object. Instances are never allocated any
   257    260   ** other way. VdbeSorter.nTask is set to the number of worker threads allowed
   258         -** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
          261  +** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).  Thus for
          262  +** single-threaded operation, there is exactly one instance of this object
          263  +** and for multi-threaded operation there are two or more instances.
   259    264   **
   260    265   ** Essentially, this structure contains all those fields of the VdbeSorter
   261    266   ** structure for which each thread requires a separate instance. For example,
   262    267   ** each thread requries its own UnpackedRecord object to unpack records in
   263    268   ** as part of comparison operations.
   264    269   **
   265    270   ** Before a background thread is launched, variable bDone is set to 0. Then, 
................................................................................
   439    444   /* Maximum number of PMAs that a single MergeEngine can merge */
   440    445   #define SORTER_MAX_MERGE_COUNT 16
   441    446   
   442    447   static int vdbeIncrSwap(IncrMerger*);
   443    448   static void vdbeIncrFree(IncrMerger *);
   444    449   
   445    450   /*
   446         -** Free all memory belonging to the PmaReader object passed as the second
          451  +** Free all memory belonging to the PmaReader object passed as the
   447    452   ** argument. All structure fields are set to zero before returning.
   448    453   */
   449    454   static void vdbePmaReaderClear(PmaReader *pReadr){
   450    455     sqlite3_free(pReadr->aAlloc);
   451    456     sqlite3_free(pReadr->aBuffer);
   452    457     if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFile, 0, pReadr->aMap);
   453    458     vdbeIncrFree(pReadr->pIncr);
   454    459     memset(pReadr, 0, sizeof(PmaReader));
   455    460   }
   456    461   
   457    462   /*
   458         -** Read nByte bytes of data from the stream of data iterated by object p.
          463  +** Read the next nByte bytes of data from the PMA p.
   459    464   ** If successful, set *ppOut to point to a buffer containing the data
   460    465   ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite
   461    466   ** error code.
   462    467   **
   463         -** The buffer indicated by *ppOut may only be considered valid until the
          468  +** The buffer returned in *ppOut is only valid until the
   464    469   ** next call to this function.
   465    470   */
   466    471   static int vdbePmaReadBlob(
   467    472     PmaReader *p,                   /* PmaReader from which to take the blob */
   468    473     int nByte,                      /* Bytes of data to read */
   469    474     u8 **ppOut                      /* OUT: Pointer to buffer containing data */
   470    475   ){
................................................................................
   590    595   ** Or, if an error occurs, return an SQLite error code. The final value of
   591    596   ** *pp is undefined in this case.
   592    597   */
   593    598   static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
   594    599     int rc = SQLITE_OK;
   595    600     if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
   596    601       rc = sqlite3OsFetch(pFile->pFd, 0, (int)pFile->iEof, (void**)pp);
          602  +    testcase( rc!=SQLITE_OK );
   597    603     }
   598    604     return rc;
   599    605   }
   600    606   
   601    607   /*
   602    608   ** Seek PmaReader pReadr to offset iOff within file pFile. Return SQLITE_OK 
   603    609   ** if successful, or an SQLite error code if an error occurs.
   604    610   */
   605    611   static int vdbePmaReaderSeek(
   606    612     SortSubtask *pTask,             /* Task context */
   607         -  PmaReader *pReadr,              /* Iterate to populate */
          613  +  PmaReader *pReadr,              /* Reader whose cursor is to be moved */
   608    614     SorterFile *pFile,              /* Sorter file to read from */
   609    615     i64 iOff                        /* Offset in pFile */
   610    616   ){
   611    617     int rc = SQLITE_OK;
   612    618   
   613    619     assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 );
   614    620   
................................................................................
   633    639         int nRead = pgsz - iBuf;
   634    640         if( (pReadr->iReadOff + nRead) > pReadr->iEof ){
   635    641           nRead = (int)(pReadr->iEof - pReadr->iReadOff);
   636    642         }
   637    643         rc = sqlite3OsRead(
   638    644             pReadr->pFile, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
   639    645         );
          646  +      testcase( rc!=SQLITE_OK );
   640    647       }
   641    648     }
   642    649   
   643    650     return rc;
   644    651   }
   645    652   
   646    653   /*
................................................................................
   664    671           bEof = 0;
   665    672         }
   666    673       }
   667    674   
   668    675       if( bEof ){
   669    676         /* This is an EOF condition */
   670    677         vdbePmaReaderClear(pReadr);
          678  +      testcase( rc!=SQLITE_OK );
   671    679         return rc;
   672    680       }
   673    681     }
   674    682   
   675    683     if( rc==SQLITE_OK ){
   676    684       rc = vdbePmaReadVarint(pReadr, &nRec);
   677    685     }
   678    686     if( rc==SQLITE_OK ){
   679    687       pReadr->nKey = (int)nRec;
   680    688       rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey);
          689  +    testcase( rc!=SQLITE_OK );
   681    690     }
   682    691   
   683    692     return rc;
   684    693   }
   685    694   
   686    695   /*
   687    696   ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile
................................................................................
  1022   1031   }
  1023   1032   #else
  1024   1033   # define vdbeSorterJoinAll(x,rcin) (rcin)
  1025   1034   # define vdbeSorterJoinThread(pTask) SQLITE_OK
  1026   1035   #endif
  1027   1036   
  1028   1037   /*
  1029         -** Allocate a new MergeEngine object with space for nReader PmaReaders.
         1038  +** Allocate a new MergeEngine object capable of handling up to
         1039  +** nReader PmaReader inputs.
         1040  +**
         1041  +** nReader is automatically rounded up to the next power of two.
         1042  +** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up.
  1030   1043   */
  1031   1044   static MergeEngine *vdbeMergeEngineNew(int nReader){
  1032   1045     int N = 2;                      /* Smallest power of two >= nReader */
  1033   1046     int nByte;                      /* Total bytes of space to allocate */
  1034   1047     MergeEngine *pNew;              /* Pointer to allocated object to return */
  1035   1048   
  1036   1049     assert( nReader<=SORTER_MAX_MERGE_COUNT );
................................................................................
  1037   1050   
  1038   1051     while( N<nReader ) N += N;
  1039   1052     nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
  1040   1053   
  1041   1054     pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte);
  1042   1055     if( pNew ){
  1043   1056       pNew->nTree = N;
         1057  +    pNew->pTask = 0;
  1044   1058       pNew->aReadr = (PmaReader*)&pNew[1];
  1045   1059       pNew->aTree = (int*)&pNew->aReadr[N];
  1046   1060     }
  1047   1061     return pNew;
  1048   1062   }
  1049   1063   
  1050   1064   /*
................................................................................
  1434   1448     vdbeSorterWorkDebug(pTask, "exit");
  1435   1449     assert( rc!=SQLITE_OK || pList->pList==0 );
  1436   1450     assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
  1437   1451     return rc;
  1438   1452   }
  1439   1453   
  1440   1454   /*
  1441         -** Advance the MergeEngine PmaReader passed as the second argument to
  1442         -** the next entry. Set *pbEof to true if this means the PmaReader has 
  1443         -** reached EOF.
         1455  +** Advance the MergeEngine pMerge (passed as the second argument) to
         1456  +** its next entry.  Set *pbEof to true there is no next entry because
         1457  +** the MergeEngine has reached the end of all its inputs.
  1444   1458   **
  1445   1459   ** Return SQLITE_OK if successful or an error code if an error occurs.
  1446   1460   */
  1447         -static int vdbeSorterNext(
  1448         -  SortSubtask *pTask, 
  1449         -  MergeEngine *pMerger, 
  1450         -  int *pbEof
         1461  +static int vdbeMergeEngineStep(
         1462  +  SortSubtask *pTask,        /* The thread in which this MergeEngine runs */
         1463  +  MergeEngine *pMerger,      /* The merge engine to advance to the next row */
         1464  +  int *pbEof                 /* Set TRUE at EOF.  Set false for more content */
  1451   1465   ){
  1452   1466     int rc;
  1453   1467     int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */
         1468  +
         1469  +  /* A MergeEngine object is only used by a single thread */
         1470  +  assert( pMerger->pTask==0 || pMerger->pTask==pTask );
         1471  +  pMerger->pTask = pTask;
  1454   1472   
  1455   1473     /* Advance the current PmaReader */
  1456   1474     rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
  1457   1475   
  1458   1476     /* Update contents of aTree[] */
  1459   1477     if( rc==SQLITE_OK ){
  1460   1478       int i;                      /* Index of aTree[] to recalculate */
................................................................................
  1716   1734       ** In either case exit the loop. */
  1717   1735       if( pReader->pFile==0 ) break;
  1718   1736       if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
  1719   1737   
  1720   1738       /* Write the next key to the output. */
  1721   1739       vdbePmaWriteVarint(&writer, nKey);
  1722   1740       vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
  1723         -    rc = vdbeSorterNext(pTask, pIncr->pMerger, &dummy);
         1741  +    rc = vdbeMergeEngineStep(pTask, pIncr->pMerger, &dummy);
  1724   1742     }
  1725   1743   
  1726   1744     rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
  1727   1745     if( rc==SQLITE_OK ) rc = rc2;
  1728   1746     vdbeSorterPopulateDebug(pTask, "exit");
  1729   1747     return rc;
  1730   1748   }
................................................................................
  2124   2142   ** can be used to incrementally merge all PMAs on disk.
  2125   2143   **
  2126   2144   ** If successful, SQLITE_OK is returned and *ppOut set to point to the
  2127   2145   ** MergeEngine object at the root of the tree before returning. Or, if an
  2128   2146   ** error occurs, an SQLite error code is returned and the final value 
  2129   2147   ** of *ppOut is undefined.
  2130   2148   */
  2131         -static int vdbeSorterMergeTreeBuild(VdbeSorter *pSorter, MergeEngine **ppOut){
         2149  +static int vdbeSorterMergeTreeBuild(
         2150  +  VdbeSorter *pSorter,       /* The VDBE cursor that implements the sort */
         2151  +  MergeEngine **ppOut        /* Write the MergeEngine here */
         2152  +){
  2132   2153     MergeEngine *pMain = 0;
  2133   2154     int rc = SQLITE_OK;
  2134   2155     int iTask;
  2135   2156   
  2136   2157   #if SQLITE_MAX_WORKER_THREADS>0
  2137   2158     /* If the sorter uses more than one task, then create the top-level 
  2138   2159     ** MergeEngine here. This MergeEngine will read data from exactly 
................................................................................
  2332   2353   #if SQLITE_MAX_WORKER_THREADS>0
  2333   2354       if( pSorter->bUseThreads ){
  2334   2355         rc = vdbePmaReaderNext(pSorter->pReader);
  2335   2356         *pbEof = (pSorter->pReader->pFile==0);
  2336   2357       }else
  2337   2358   #endif
  2338   2359       /*if( !pSorter->bUseThreads )*/ {
  2339         -      rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof);
         2360  +      rc = vdbeMergeEngineStep(&pSorter->aTask[0], pSorter->pMerger, pbEof);
  2340   2361       }
  2341   2362     }else{
  2342   2363       SorterRecord *pFree = pSorter->list.pList;
  2343   2364       pSorter->list.pList = pFree->u.pNext;
  2344   2365       pFree->u.pNext = 0;
  2345   2366       if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
  2346   2367       *pbEof = !pSorter->list.pList;