/ Check-in [e1bdc4b8]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Improve use of multiple threads in sqlite3VdbeSorterRewind().
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | threads-experimental
Files: files | file ages | folders
SHA1:e1bdc4b810907cc0e55e0c923c8ebc777068cfe0
User & Date: dan 2014-04-14 07:30:39
Context
2014-04-14
08:45
Minor fixes so that builds with SQLITE_MAX_WORKER_THREADS=0 work. check-in: e400bbbf user: dan tags: threads-experimental
07:30
Improve use of multiple threads in sqlite3VdbeSorterRewind(). check-in: e1bdc4b8 user: dan tags: threads-experimental
2014-04-12
19:34
Fix many issues with new code. check-in: 62c406a0 user: dan tags: threads-experimental
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/vdbesort.c.

1706
1707
1708
1709
1710
1711
1712



1713
1714
1715
1716
1717
1718
1719
1720
1721




1722

1723
1724
1725
1726
1727
1728
1729
....
1747
1748
1749
1750
1751
1752
1753


1754
1755
1756
1757
1758
1759
1760
1761
1762














1763
1764
1765
1766
1767
1768
1769
....
1929
1930
1931
1932
1933
1934
1935







1936
1937
1938


1939

1940
1941
1942
1943
1944
1945
1946
static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
  if( bUseThread ){
    pIncr->bUseThread = 1;
    pIncr->pTask->file2.iEof -= pIncr->mxSz;
  }
}




static int vdbeIncrInit2(PmaReader *pIter){
  int rc = SQLITE_OK;
  IncrMerger *pIncr = pIter->pIncr;
  if( pIncr ){
    SortSubtask *pTask = pIncr->pTask;
    int i;
    MergeEngine *pMerger = pIncr->pMerger;

    for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){




      rc = vdbeIncrInit2(&pMerger->aIter[i]);

    }

    /* Set up the required files for pIncr */
    if( rc==SQLITE_OK ){
      if( pIncr->bUseThread==0 ){
        if( pTask->file2.pFd==0 ){
          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
................................................................................
    }

    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
      rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
    }

    if( rc==SQLITE_OK && pIncr->bUseThread ){


      rc = vdbeIncrBgPopulate(pIncr);
    }

    if( rc==SQLITE_OK ){
      rc = vdbePmaReaderNext(pIter);
    }
  }
  return rc;
}















/*
** Allocate a new MergeEngine object to merge the contents of nPMA level-0
** PMAs from pTask->file. If no error occurs, set *ppOut to point to
** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
** to NULL and return an SQLite error code.
**
................................................................................
        for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
          IncrMerger *pIncr;
          if( (pIncr = pMain->aIter[iTask].pIncr) ){
            vdbeIncrSetThreads(pIncr, pSorter->bUseThreads);
            assert( pIncr->pTask!=pLast );
          }
        }







      }
    }
  }


  if( rc==SQLITE_OK ) rc = vdbeIncrInit2(pIter);


  sqlite3_free(aMerge);
  return rc;
}


/*







>
>
>
|








>
>
>
>
|
>







 







>
>
|


|





>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>
>
>



>
>
|
>







1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
....
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
....
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
  if( bUseThread ){
    pIncr->bUseThread = 1;
    pIncr->pTask->file2.iEof -= pIncr->mxSz;
  }
}

#define INCRINIT2_NORMAL 0
#define INCRINIT2_TASK   1
#define INCRINIT2_ROOT   2
static int vdbeIncrInit2(PmaReader *pIter, int eMode){
  int rc = SQLITE_OK;
  IncrMerger *pIncr = pIter->pIncr;
  if( pIncr ){
    SortSubtask *pTask = pIncr->pTask;
    int i;
    MergeEngine *pMerger = pIncr->pMerger;

    for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
      IncrMerger *p;
      if( eMode==INCRINIT2_ROOT ){
        rc = vdbePmaReaderNext(&pMerger->aIter[i]);
      }else{
        rc = vdbeIncrInit2(&pMerger->aIter[i], INCRINIT2_NORMAL);
      }
    }

    /* Set up the required files for pIncr */
    if( rc==SQLITE_OK ){
      if( pIncr->bUseThread==0 ){
        if( pTask->file2.pFd==0 ){
          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
................................................................................
    }

    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
      rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
    }

    if( rc==SQLITE_OK && pIncr->bUseThread ){
      /* Use the current thread */
      assert( eMode==INCRINIT2_ROOT || eMode==INCRINIT2_TASK );
      rc = vdbeIncrPopulate(pIncr);
    }

    if( rc==SQLITE_OK && eMode!=INCRINIT2_TASK ){
      rc = vdbePmaReaderNext(pIter);
    }
  }
  return rc;
}

static void *vdbeIncrInit2Thread(void *pCtx){
  PmaReader *pReader = (PmaReader*)pCtx;
  void *pRet = SQLITE_INT_TO_PTR( vdbeIncrInit2(pReader, INCRINIT2_TASK) );
  pReader->pIncr->thread.bDone = 1;
  return pRet;
}

static int vdbeIncrBgInit2(PmaReader *pIter){
  void *pCtx = (void*)pIter;
  return vdbeSorterCreateThread(
      &pIter->pIncr->thread, vdbeIncrInit2Thread, pCtx
  );
}

/*
** Allocate a new MergeEngine object to merge the contents of nPMA level-0
** PMAs from pTask->file. If no error occurs, set *ppOut to point to
** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
** to NULL and return an SQLite error code.
**
................................................................................
        for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
          IncrMerger *pIncr;
          if( (pIncr = pMain->aIter[iTask].pIncr) ){
            vdbeIncrSetThreads(pIncr, pSorter->bUseThreads);
            assert( pIncr->pTask!=pLast );
          }
        }
        if( pSorter->nTask>1 ){
          for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
            PmaReader *p = &pMain->aIter[iTask];
            assert( p->pIncr==0 || p->pIncr->pTask==&pSorter->aTask[iTask] );
            if( p->pIncr ){ rc = vdbeIncrBgInit2(p); }
          }
        }
      }
    }
  }
  if( rc==SQLITE_OK ){
    int eMode = (pSorter->nTask>1 ? INCRINIT2_ROOT : INCRINIT2_NORMAL);
    rc = vdbeIncrInit2(pIter, eMode);
  }

  sqlite3_free(aMerge);
  return rc;
}


/*