/ Check-in [78c7ec95]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Reorganize some multi-threaded code in vdbesort.c so that full MC/DC test coverage does not depend on the outcome of a race condition.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 78c7ec95931265b89a92f6a799fc9b1a9f0476bf
User & Date: dan 2015-05-02 12:40:12
Context
2015-05-02
17:40
Add the ".limit" command to the command-line shell. check-in: 803cb60e user: drh tags: trunk
12:40
Reorganize some multi-threaded code in vdbesort.c so that full MC/DC test coverage does not depend on the outcome of a race condition. check-in: 78c7ec95 user: dan tags: trunk
11:45
Cleanup of the sqlite3StrAccumInit() function. No functionality changes. check-in: 7952c322 user: drh tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/vdbesort.c.

  2059   2059   ** SQLITE_MAX_WORKER_THREADS==0).  The other values are only used
  2060   2060   ** when there exists one or more separate worker threads.
  2061   2061   */
  2062   2062   #define INCRINIT_NORMAL 0
  2063   2063   #define INCRINIT_TASK   1
  2064   2064   #define INCRINIT_ROOT   2
  2065   2065   
  2066         -/* Forward reference.
  2067         -** The vdbeIncrMergeInit() and vdbePmaReaderIncrMergeInit() routines call each
  2068         -** other (when building a merge tree).
         2066  +/* 
         2067  +** Forward reference required as the vdbeIncrMergeInit() and
         2068  +** vdbePmaReaderIncrInit() routines are called mutually recursively when
         2069  +** building a merge tree.
  2069   2070   */
  2070         -static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode);
         2071  +static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode);
  2071   2072   
  2072   2073   /*
  2073   2074   ** Initialize the MergeEngine object passed as the second argument. Once this
  2074   2075   ** function returns, the first key of merged data may be read from the 
  2075   2076   ** MergeEngine object in the usual fashion.
  2076   2077   **
  2077   2078   ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
................................................................................
  2110   2111         ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
  2111   2112         ** in use it will block the vdbePmaReaderNext() call while it uses
  2112   2113         ** the main thread to fill its buffer. So calling PmaReaderNext()
  2113   2114         ** on this PmaReader before any of the multi-threaded PmaReaders takes
  2114   2115         ** better advantage of multi-processor hardware. */
  2115   2116         rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
  2116   2117       }else{
  2117         -      rc = vdbePmaReaderIncrMergeInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
         2118  +      rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
  2118   2119       }
  2119   2120       if( rc!=SQLITE_OK ) return rc;
  2120   2121     }
  2121   2122   
  2122   2123     for(i=pMerger->nTree-1; i>0; i--){
  2123   2124       vdbeMergeEngineCompare(pMerger, i);
  2124   2125     }
  2125   2126     return pTask->pUnpacked->errCode;
  2126   2127   }
  2127   2128   
  2128   2129   /*
  2129         -** Initialize the IncrMerge field of a PmaReader.
  2130         -**
  2131         -** If the PmaReader passed as the first argument is not an incremental-reader
  2132         -** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it serves
  2133         -** to open and/or initialize the temp file related fields of the IncrMerge
         2130  +** The PmaReader passed as the first argument is guaranteed to be an
         2131  +** incremental-reader (pReadr->pIncr!=0). This function serves to open
         2132  +** and/or initialize the temp file related fields of the IncrMerge
  2134   2133   ** object at (pReadr->pIncr).
  2135   2134   **
  2136   2135   ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
  2137         -** in the sub-tree headed by pReadr are also initialized. Data is then loaded
  2138         -** into the buffers belonging to pReadr and it is set to
  2139         -** point to the first key in its range.
         2136  +** in the sub-tree headed by pReadr are also initialized. Data is then 
         2137  +** loaded into the buffers belonging to pReadr and it is set to point to 
         2138  +** the first key in its range.
  2140   2139   **
  2141   2140   ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
  2142   2141   ** to be a multi-threaded PmaReader and this function is being called in a
  2143   2142   ** background thread. In this case all PmaReaders in the sub-tree are 
  2144   2143   ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
  2145   2144   ** pReadr is populated. However, pReadr itself is not set up to point
  2146   2145   ** to its first key. A call to vdbePmaReaderNext() is still required to do
................................................................................
  2159   2158   ** the current PmaReader set to point to the first key in its range.
  2160   2159   **
  2161   2160   ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
  2162   2161   */
  2163   2162   static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
  2164   2163     int rc = SQLITE_OK;
  2165   2164     IncrMerger *pIncr = pReadr->pIncr;
         2165  +  SortSubtask *pTask = pIncr->pTask;
         2166  +  sqlite3 *db = pTask->pSorter->db;
  2166   2167   
  2167   2168     /* eMode is always INCRINIT_NORMAL in single-threaded mode */
  2168   2169     assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
  2169   2170   
  2170         -  if( pIncr ){
  2171         -    SortSubtask *pTask = pIncr->pTask;
  2172         -    sqlite3 *db = pTask->pSorter->db;
         2171  +  rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
  2173   2172   
  2174         -    rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
  2175         -
  2176         -    /* Set up the required files for pIncr. A multi-theaded IncrMerge object
  2177         -    ** requires two temp files to itself, whereas a single-threaded object
  2178         -    ** only requires a region of pTask->file2. */
  2179         -    if( rc==SQLITE_OK ){
  2180         -      int mxSz = pIncr->mxSz;
         2173  +  /* Set up the required files for pIncr. A multi-theaded IncrMerge object
         2174  +  ** requires two temp files to itself, whereas a single-threaded object
         2175  +  ** only requires a region of pTask->file2. */
         2176  +  if( rc==SQLITE_OK ){
         2177  +    int mxSz = pIncr->mxSz;
  2181   2178   #if SQLITE_MAX_WORKER_THREADS>0
  2182         -      if( pIncr->bUseThread ){
  2183         -        rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
  2184         -        if( rc==SQLITE_OK ){
  2185         -          rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
  2186         -        }
  2187         -      }else
         2179  +    if( pIncr->bUseThread ){
         2180  +      rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
         2181  +      if( rc==SQLITE_OK ){
         2182  +        rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
         2183  +      }
         2184  +    }else
  2188   2185   #endif
  2189         -      /*if( !pIncr->bUseThread )*/{
  2190         -        if( pTask->file2.pFd==0 ){
  2191         -          assert( pTask->file2.iEof>0 );
  2192         -          rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
  2193         -          pTask->file2.iEof = 0;
  2194         -        }
  2195         -        if( rc==SQLITE_OK ){
  2196         -          pIncr->aFile[1].pFd = pTask->file2.pFd;
  2197         -          pIncr->iStartOff = pTask->file2.iEof;
  2198         -          pTask->file2.iEof += mxSz;
  2199         -        }
         2186  +    /*if( !pIncr->bUseThread )*/{
         2187  +      if( pTask->file2.pFd==0 ){
         2188  +        assert( pTask->file2.iEof>0 );
         2189  +        rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
         2190  +        pTask->file2.iEof = 0;
         2191  +      }
         2192  +      if( rc==SQLITE_OK ){
         2193  +        pIncr->aFile[1].pFd = pTask->file2.pFd;
         2194  +        pIncr->iStartOff = pTask->file2.iEof;
         2195  +        pTask->file2.iEof += mxSz;
  2200   2196         }
  2201   2197       }
         2198  +  }
  2202   2199   
  2203   2200   #if SQLITE_MAX_WORKER_THREADS>0
  2204         -    if( rc==SQLITE_OK && pIncr->bUseThread ){
  2205         -      /* Use the current thread to populate aFile[1], even though this
  2206         -      ** PmaReader is multi-threaded. The reason being that this function
  2207         -      ** is already running in background thread pIncr->pTask->thread. */
  2208         -      assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
  2209         -      rc = vdbeIncrPopulate(pIncr);
  2210         -    }
         2201  +  if( rc==SQLITE_OK && pIncr->bUseThread ){
         2202  +    /* Use the current thread to populate aFile[1], even though this
         2203  +    ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object,
         2204  +    ** then this function is already running in background thread 
         2205  +    ** pIncr->pTask->thread. 
         2206  +    **
         2207  +    ** If this is the INCRINIT_ROOT object, then it is running in the 
         2208  +    ** main VDBE thread. But that is Ok, as that thread cannot return
         2209  +    ** control to the VDBE or proceed with anything useful until the 
         2210  +    ** first results are ready from this merger object anyway.
         2211  +    */
         2212  +    assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
         2213  +    rc = vdbeIncrPopulate(pIncr);
         2214  +  }
  2211   2215   #endif
  2212   2216   
  2213         -    if( rc==SQLITE_OK
  2214         -     && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK)
  2215         -    ){
  2216         -      rc = vdbePmaReaderNext(pReadr);
  2217         -    }
         2217  +  if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){
         2218  +    rc = vdbePmaReaderNext(pReadr);
  2218   2219     }
         2220  +
  2219   2221     return rc;
  2220   2222   }
  2221   2223   
  2222   2224   #if SQLITE_MAX_WORKER_THREADS>0
  2223   2225   /*
  2224   2226   ** The main routine for vdbePmaReaderIncrMergeInit() operations run in 
  2225   2227   ** background threads.
  2226   2228   */
  2227         -static void *vdbePmaReaderBgInit(void *pCtx){
         2229  +static void *vdbePmaReaderBgIncrInit(void *pCtx){
  2228   2230     PmaReader *pReader = (PmaReader*)pCtx;
  2229   2231     void *pRet = SQLITE_INT_TO_PTR(
  2230   2232                     vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
  2231   2233                  );
  2232   2234     pReader->pIncr->pTask->bDone = 1;
  2233   2235     return pRet;
  2234   2236   }
         2237  +#endif
  2235   2238   
  2236   2239   /*
  2237         -** Use a background thread to invoke vdbePmaReaderIncrMergeInit(INCRINIT_TASK) 
  2238         -** on the PmaReader object passed as the first argument.
  2239         -**
  2240         -** This call will initialize the various fields of the pReadr->pIncr 
  2241         -** structure and, if it is a multi-threaded IncrMerger, launch a 
  2242         -** background thread to populate aFile[1].
         2240  +** If the PmaReader passed as the first argument is not an incremental-reader
         2241  +** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes
         2242  +** the vdbePmaReaderIncrMergeInit() function with the parameters passed to
         2243  +** this routine to initialize the incremental merge.
         2244  +** 
         2245  +** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1), 
         2246  +** then a background thread is launched to call vdbePmaReaderIncrMergeInit().
         2247  +** Or, if the IncrMerger is single threaded, the same function is called
         2248  +** using the current thread.
  2243   2249   */
  2244         -static int vdbePmaReaderBgIncrInit(PmaReader *pReadr){
  2245         -  void *pCtx = (void*)pReadr;
  2246         -  return vdbeSorterCreateThread(pReadr->pIncr->pTask, vdbePmaReaderBgInit, pCtx);
  2247         -}
         2250  +static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){
         2251  +  IncrMerger *pIncr = pReadr->pIncr;   /* Incremental merger */
         2252  +  int rc = SQLITE_OK;                  /* Return code */
         2253  +  if( pIncr ){
         2254  +#if SQLITE_MAX_WORKER_THREADS>0
         2255  +    assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK );
         2256  +    if( pIncr->bUseThread ){
         2257  +      void *pCtx = (void*)pReadr;
         2258  +      rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx);
         2259  +    }else
  2248   2260   #endif
         2261  +    {
         2262  +      rc = vdbePmaReaderIncrMergeInit(pReadr, eMode);
         2263  +    }
         2264  +  }
         2265  +  return rc;
         2266  +}
  2249   2267   
  2250   2268   /*
  2251   2269   ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
  2252   2270   ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
  2253   2271   ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
  2254   2272   ** to NULL and return an SQLite error code.
  2255   2273   **
................................................................................
  2486   2504               IncrMerger *pIncr;
  2487   2505               if( (pIncr = pMain->aReadr[iTask].pIncr) ){
  2488   2506                 vdbeIncrMergerSetThreads(pIncr);
  2489   2507                 assert( pIncr->pTask!=pLast );
  2490   2508               }
  2491   2509             }
  2492   2510             for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
         2511  +            /* Check that:
         2512  +            **   
         2513  +            **   a) The incremental merge object is configured to use the
         2514  +            **      right task, and
         2515  +            **   b) If it is using task (nTask-1), it is configured to run
         2516  +            **      in single-threaded mode. This is important, as the
         2517  +            **      root merge (INCRINIT_ROOT) will be using the same task
         2518  +            **      object.
         2519  +            */
  2493   2520               PmaReader *p = &pMain->aReadr[iTask];
  2494         -            assert( p->pIncr==0 || p->pIncr->pTask==&pSorter->aTask[iTask] );
  2495         -            if( p->pIncr ){ 
  2496         -              if( iTask==pSorter->nTask-1 ){
  2497         -                rc = vdbePmaReaderIncrMergeInit(p, INCRINIT_TASK);
  2498         -              }else{
  2499         -                rc = vdbePmaReaderBgIncrInit(p);
  2500         -              }
  2501         -            }
         2521  +            assert( p->pIncr==0 || (
         2522  +                (p->pIncr->pTask==&pSorter->aTask[iTask])             /* a */
         2523  +             && (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0)  /* b */
         2524  +            ));
         2525  +            rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK);
  2502   2526             }
  2503   2527           }
  2504   2528           pMain = 0;
  2505   2529         }
  2506   2530         if( rc==SQLITE_OK ){
  2507   2531           rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT);
  2508   2532         }