/ Check-in [62c406a0]
Login
SQLite training in Houston TX on 2019-11-05 (details)
Part of the 2019 Tcl Conference

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix many issues with new code.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | threads-experimental
Files: files | file ages | folders
SHA1: 62c406a042d7246f6df6b943421182a88483b2e3
User & Date: dan 2014-04-12 19:34:44
Context
2014-04-14
07:30
Improve use of multiple threads in sqlite3VdbeSorterRewind(). check-in: e1bdc4b8 user: dan tags: threads-experimental
2014-04-12
19:34
Fix many issues with new code. check-in: 62c406a0 user: dan tags: threads-experimental
2014-04-11
19:43
Avoid having the sorter merge too many PMAs at a time when incrementally merging data following a SorterRewind(). check-in: 98bf0307 user: dan tags: threads-experimental
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/vdbesort.c.

    84     84   ** algorithm might be performed in parallel by separate threads.  Threads
    85     85   ** are only used when one or more PMA spill to disk.  If the sort is small
    86     86   ** enough to fit entirely in memory, everything happens on the main thread.
    87     87   */
    88     88   #include "sqliteInt.h"
    89     89   #include "vdbeInt.h"
    90     90   
           91  +/* 
           92  +** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
           93  +** messages to stderr that may be helpful in understanding the performance
           94  +** characteristics of the sorter in multi-threaded mode.
           95  +*/
           96  +#if 0
           97  +# define SQLITE_DEBUG_SORTER_THREADS 1
           98  +#endif
           99  +
    91    100   /*
    92    101   ** Private objects used by the sorter
    93    102   */
    94    103   typedef struct MergeEngine MergeEngine;     /* Merge PMAs together */
    95    104   typedef struct PmaReader PmaReader;         /* Incrementally read one PMA */
    96    105   typedef struct PmaWriter PmaWriter;         /* Incrementally write on PMA */
    97    106   typedef struct SorterRecord SorterRecord;   /* A record being sorted */
    98    107   typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
    99    108   typedef struct SorterFile SorterFile;
          109  +typedef struct SorterThread SorterThread;
          110  +typedef struct SorterList SorterList;
   100    111   typedef struct IncrMerger IncrMerger;
   101    112   
   102         -
   103    113   /*
   104         -** Candidate values for SortSubtask.eWork
          114  +** A container for a temp file handle and the current amount of data 
          115  +** stored in the file.
   105    116   */
   106         -#define SORT_SUBTASK_SORT   1     /* Sort records on pList */
   107         -#define SORT_SUBTASK_TO_PMA 2     /* Xfer pList to Packed-Memory-Array pTemp1 */
   108         -#define SORT_SUBTASK_CONS   3     /* Consolidate multiple PMAs */
   109         -
   110    117   struct SorterFile {
   111         -  sqlite3_file *pFd;
   112         -  i64 iEof;
          118  +  sqlite3_file *pFd;              /* File handle */
          119  +  i64 iEof;                       /* Bytes of data stored in pFd */
          120  +};
          121  +
          122  +/*
          123  +** An object of this type is used to store the thread handle for each 
          124  +** background thread launched by the sorter. Before the thread is launched,
          125  +** variable bDone is set to 0. Then, right before it exits, the thread 
          126  +** itself sets bDone to 1.
          127  +**
          128  +** This is then used for two purposes:
          129  +**
          130  +**   1. When flushing the contents of memory to a level-0 PMA on disk, to
          131  +**      attempt to select a SortSubtask for which there is not already an
          132  +**      active background thread (since doing so causes the main thread
          133  +**      to block until it finishes).
          134  +**
          135  +**   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
          136  +**      to sqlite3ThreadJoin() is likely to block.
          137  +**
          138  +** In both cases, the effects of the main thread seeing (bDone==0) even
          139  +** after the thread has finished are not dire. So we don't worry about
          140  +** memory barriers and such here.
          141  +*/
          142  +struct SorterThread {
          143  +  SQLiteThread *pThread;
          144  +  int bDone;
          145  +};
          146  +
          147  +struct SorterList {
          148  +  SorterRecord *pList;            /* Linked list of records */
          149  +  u8 *aMemory;                    /* If non-NULL, blob of memory for pList */
          150  +  int szPMA;                      /* Size of pList as PMA in bytes */
   113    151   };
   114    152   
   115    153   /*
   116    154   ** Sorting is divided up into smaller subtasks.  Each subtask is controlled
   117    155   ** by an instance of this object. A Subtask might run in either the main thread
   118    156   ** or in a background thread.
   119    157   **
................................................................................
   144    182   **     the temp file if it is not already open.
   145    183   **
   146    184   **   SORT_SUBTASK_CONS:
   147    185   **     Merge existing PMAs until SortSubtask.nConsolidate or fewer
   148    186   **     remain in temp file SortSubtask.pTemp1.
   149    187   */
   150    188   struct SortSubtask {
   151         -  SQLiteThread *pThread;          /* Thread handle, or NULL */
   152         -  int bDone;                      /* Set to true by pTask when finished */
   153         -
          189  +  SorterThread thread;
   154    190     sqlite3 *db;                    /* Database connection */
   155    191     VdbeSorter *pSorter;            /* Sorter */
   156    192     KeyInfo *pKeyInfo;              /* How to compare records */
   157    193     UnpackedRecord *pUnpacked;      /* Space to unpack a record */
   158    194     int pgsz;                       /* Main database page size */
   159         -
   160         -  u8 eWork;                       /* One of the SORT_SUBTASK_* constants */
   161         -  int nConsolidate;               /* For SORT_SUBTASK_CONS, max final PMAs */
   162         -  SorterRecord *pList;            /* List of records for pTask to sort */
   163         -  int nInMemory;                  /* Expected size of PMA based on pList */
   164         -  u8 *aListMemory;                /* Records memory (or NULL) */
   165         -
          195  +  SorterList list;                /* List for thread to write to a PMA */
   166    196     int nPMA;                       /* Number of PMAs currently in file */
   167    197     SorterFile file;                /* Temp file for level-0 PMAs */
   168    198     SorterFile file2;               /* Space for other PMAs */
   169    199   };
   170    200   
   171    201   
   172    202   /*
................................................................................
   244    274   **
   245    275   ** mxKeysize:
   246    276   **   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
   247    277   **   this variable is updated so as to be set to the size on disk of the
   248    278   **   largest record in the sorter.
   249    279   */
   250    280   struct VdbeSorter {
   251         -  int nInMemory;                  /* Current size of pRecord list as PMA */
   252    281     int mnPmaSize;                  /* Minimum PMA size, in bytes */
   253    282     int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
   254    283     int bUsePMA;                    /* True if one or more PMAs created */
   255    284     int bUseThreads;                /* True if one or more PMAs created */
   256         -  SorterRecord *pRecord;          /* Head of in-memory record list */
   257    285     PmaReader *pReader;             /* Read data from here after Rewind() */
   258    286     int mxKeysize;                  /* Largest serialized key seen so far */
   259    287     UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
          288  +#if 0
          289  +  int nInMemory;                  /* Current size of pRecord list as PMA */
          290  +  SorterRecord *pRecord;          /* Head of in-memory record list */
   260    291     u8 *aMemory;                    /* Block of memory to alloc records from */
          292  +#endif
          293  +  SorterList list;
   261    294     int iMemory;                    /* Offset of first free byte in aMemory */
   262    295     int nMemory;                    /* Size of aMemory allocation in bytes */
   263    296     int iPrev;                      /* Previous thread used to flush PMA */
   264    297     int nTask;                      /* Size of aTask[] array */
   265    298     SortSubtask aTask[1];           /* One or more subtasks */
   266    299   };
   267    300   
................................................................................
   288    321   ** Normally, a PmaReader object iterates through an existing PMA stored 
   289    322   ** within a temp file. However, if the PmaReader.pIncr variable points to
   290    323   ** an object of the following type, it may be used to iterate/merge through
   291    324   ** multiple PMAs simultaneously.
   292    325   */
   293    326   struct IncrMerger {
   294    327     SortSubtask *pTask;             /* Task that owns this merger */
   295         -  SQLiteThread *pThread;          /* Thread currently populating aFile[1] */
          328  +  SorterThread thread;            /* Thread for populating aFile[1] */
   296    329     MergeEngine *pMerger;           /* Merge engine thread reads data from */
   297    330     i64 iStartOff;                  /* Offset to start writing file at */
   298    331     int mxSz;                       /* Maximum bytes of data to store */
   299    332     int bEof;                       /* Set to true when merge is finished */
   300    333     int bUseThread;                 /* True to use a bg thread for this object */
   301    334     SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
   302    335   };
................................................................................
   783    816   
   784    817         /* If the application is using memsys3 or memsys5, use a separate 
   785    818         ** allocation for each sort-key in memory. Otherwise, use a single big
   786    819         ** allocation at pSorter->aMemory for all sort-keys.  */
   787    820         if( sqlite3GlobalConfig.pHeap==0 ){
   788    821           assert( pSorter->iMemory==0 );
   789    822           pSorter->nMemory = pgsz;
   790         -        pSorter->aMemory = (u8*)sqlite3Malloc(pgsz);
   791         -        if( !pSorter->aMemory ) rc = SQLITE_NOMEM;
          823  +        pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
          824  +        if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM;
   792    825         }
   793    826       }
   794    827     }
   795    828   
   796    829     return rc;
   797    830   }
   798    831   
................................................................................
   811    844   /*
   812    845   ** Free all resources owned by the object indicated by argument pTask. All 
   813    846   ** fields of *pTask are zeroed before returning.
   814    847   */
   815    848   static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
   816    849     sqlite3DbFree(db, pTask->pUnpacked);
   817    850     pTask->pUnpacked = 0;
   818         -  if( pTask->aListMemory==0 ){
   819         -    vdbeSorterRecordFree(0, pTask->pList);
          851  +  if( pTask->list.aMemory==0 ){
          852  +    vdbeSorterRecordFree(0, pTask->list.pList);
   820    853     }else{
   821         -    sqlite3_free(pTask->aListMemory);
   822         -    pTask->aListMemory = 0;
          854  +    sqlite3_free(pTask->list.aMemory);
          855  +    pTask->list.aMemory = 0;
   823    856     }
   824         -  pTask->pList = 0;
          857  +  pTask->list.pList = 0;
   825    858     if( pTask->file.pFd ){
   826    859       sqlite3OsCloseFree(pTask->file.pFd);
   827    860       pTask->file.pFd = 0;
   828    861       pTask->file.iEof = 0;
   829    862     }
   830    863     if( pTask->file2.pFd ){
   831    864       sqlite3OsCloseFree(pTask->file2.pFd);
   832    865       pTask->file2.pFd = 0;
   833    866       pTask->file2.iEof = 0;
   834    867     }
   835    868   }
   836    869   
   837         -/*
   838         -** Join all threads.  
   839         -*/
          870  +#ifdef SQLITE_DEBUG_SORTER_THREADS
          871  +static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
          872  +  i64 t;
          873  +  int iTask = (pTask - pTask->pSorter->aTask);
          874  +  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
          875  +  fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
          876  +}
          877  +static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
          878  +  i64 t;
          879  +  sqlite3OsCurrentTimeInt64(db->pVfs, &t);
          880  +  fprintf(stderr, "%lld:X %s\n", t, zEvent);
          881  +}
          882  +static void vdbeSorterPopulateDebug(
          883  +  SortSubtask *pTask,
          884  +  const char *zEvent
          885  +){
          886  +  i64 t;
          887  +  int iTask = (pTask - pTask->pSorter->aTask);
          888  +  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
          889  +  fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
          890  +}
          891  +static void vdbeSorterBlockDebug(
          892  +  SortSubtask *pTask,
          893  +  int bBlocked,
          894  +  const char *zEvent
          895  +){
          896  +  if( bBlocked ){
          897  +    i64 t;
          898  +    sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
          899  +    fprintf(stderr, "%lld:main %s\n", t, zEvent);
          900  +  }
          901  +}
          902  +#else
          903  +# define vdbeSorterWorkDebug(x,y)
          904  +# define vdbeSorterRewindDebug(x,y)
          905  +# define vdbeSorterPopulateDebug(x,y)
          906  +# define vdbeSorterBlockDebug(x,y,z)
          907  +#endif
          908  +
   840    909   #if SQLITE_MAX_WORKER_THREADS>0
          910  +/*
          911  +** Join thread p.
          912  +*/
          913  +static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){
          914  +  int rc = SQLITE_OK;
          915  +  if( p->pThread ){
          916  +#ifdef SQLITE_DEBUG_SORTER_THREADS
          917  +    int bDone = p->bDone;
          918  +#endif
          919  +    void *pRet;
          920  +    vdbeSorterBlockDebug(pTask, !bDone, "enter");
          921  +    rc = sqlite3ThreadJoin(p->pThread, &pRet);
          922  +    vdbeSorterBlockDebug(pTask, !bDone, "exit");
          923  +    if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
          924  +    assert( p->bDone==1 );
          925  +    p->bDone = 0;
          926  +    p->pThread = 0;
          927  +  }
          928  +  return rc;
          929  +}
          930  +
          931  +/*
          932  +** Launch a background thread to run xTask(pIn).
          933  +*/
          934  +static int vdbeSorterCreateThread(
          935  +  SorterThread *p,                /* Thread object to populate */
          936  +  void *(*xTask)(void*),          /* Routine to run in a separate thread */
          937  +  void *pIn                       /* Argument passed into xTask() */
          938  +){
          939  +  assert( p->pThread==0 && p->bDone==0 );
          940  +  return sqlite3ThreadCreate(&p->pThread, xTask, pIn);
          941  +}
          942  +
          943  +/*
          944  +** Join all outstanding threads launched by SorterWrite() to create 
          945  +** level-0 PMAs.
          946  +*/
   841    947   static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
   842    948     int rc = rcin;
   843    949     int i;
   844    950     for(i=0; i<pSorter->nTask; i++){
   845    951       SortSubtask *pTask = &pSorter->aTask[i];
   846         -    if( pTask->pThread ){
   847         -      void *pRet;
   848         -      int rc2 = sqlite3ThreadJoin(pTask->pThread, &pRet);
   849         -      pTask->pThread = 0;
   850         -      pTask->bDone = 0;
   851         -      if( rc==SQLITE_OK ) rc = rc2;
   852         -      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
   853         -    }
          952  +    int rc2 = vdbeSorterJoinThread(pTask, &pTask->thread);
          953  +    if( rc==SQLITE_OK ) rc = rc2;
   854    954     }
   855    955     return rc;
   856    956   }
   857    957   #else
   858    958   # define vdbeSorterJoinAll(x,rcin) (rcin)
          959  +# define vdbeSorterJoinThread(pTask,p) SQLITE_OK
   859    960   #endif
   860    961   
   861    962   /*
   862    963   ** Allocate a new MergeEngine object with space for nIter iterators.
   863    964   */
   864    965   static MergeEngine *vdbeMergeEngineNew(int nIter){
   865    966     int N = 2;                      /* Smallest power of two >= nIter */
................................................................................
   904   1005       sqlite3DbFree(db, pSorter->pReader);
   905   1006       pSorter->pReader = 0;
   906   1007     }
   907   1008     for(i=0; i<pSorter->nTask; i++){
   908   1009       SortSubtask *pTask = &pSorter->aTask[i];
   909   1010       vdbeSortSubtaskCleanup(db, pTask);
   910   1011     }
   911         -  if( pSorter->aMemory==0 ){
   912         -    vdbeSorterRecordFree(0, pSorter->pRecord);
         1012  +  if( pSorter->list.aMemory==0 ){
         1013  +    vdbeSorterRecordFree(0, pSorter->list.pList);
   913   1014     }
   914         -  pSorter->pRecord = 0;
   915         -  pSorter->nInMemory = 0;
         1015  +  pSorter->list.pList = 0;
         1016  +  pSorter->list.szPMA = 0;
   916   1017     pSorter->bUsePMA = 0;
   917   1018     pSorter->iMemory = 0;
   918   1019     pSorter->mxKeysize = 0;
   919   1020     sqlite3DbFree(db, pSorter->pUnpacked);
   920   1021     pSorter->pUnpacked = 0;
   921   1022   }
   922   1023   
................................................................................
   923   1024   /*
   924   1025   ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
   925   1026   */
   926   1027   void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
   927   1028     VdbeSorter *pSorter = pCsr->pSorter;
   928   1029     if( pSorter ){
   929   1030       sqlite3VdbeSorterReset(db, pSorter);
   930         -    sqlite3_free(pSorter->aMemory);
         1031  +    sqlite3_free(pSorter->list.aMemory);
   931   1032       sqlite3DbFree(db, pSorter);
   932   1033       pCsr->pSorter = 0;
   933   1034     }
   934   1035   }
   935   1036   
   936   1037   /*
   937   1038   ** Allocate space for a file-handle and open a temporary file. If successful,
................................................................................
   947   1048     );
   948   1049     if( rc==SQLITE_OK ){
   949   1050       i64 max = SQLITE_MAX_MMAP_SIZE;
   950   1051       sqlite3OsFileControlHint( *ppFile, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
   951   1052     }
   952   1053     return rc;
   953   1054   }
         1055  +
         1056  +static int vdbeSortAllocUnpacked(SortSubtask *pTask){
         1057  +  if( pTask->pUnpacked==0 ){
         1058  +    char *pFree;
         1059  +    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
         1060  +        pTask->pKeyInfo, 0, 0, &pFree
         1061  +    );
         1062  +    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
         1063  +    if( pFree==0 ) return SQLITE_NOMEM;
         1064  +    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
         1065  +    pTask->pUnpacked->errCode = 0;
         1066  +  }
         1067  +  return SQLITE_OK;
         1068  +}
         1069  +
   954   1070   
   955   1071   /*
   956   1072   ** Merge the two sorted lists p1 and p2 into a single list.
   957   1073   ** Set *ppOut to the head of the new list.
   958   1074   */
   959   1075   static void vdbeSorterMerge(
   960   1076     SortSubtask *pTask,             /* Calling thread context */
................................................................................
   987   1103   }
   988   1104   
   989   1105   /*
   990   1106   ** Sort the linked list of records headed at pTask->pList. Return 
   991   1107   ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
   992   1108   ** an error occurs.
   993   1109   */
   994         -static int vdbeSorterSort(SortSubtask *pTask){
         1110  +static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
   995   1111     int i;
   996   1112     SorterRecord **aSlot;
   997   1113     SorterRecord *p;
         1114  +  int rc;
         1115  +
         1116  +  rc = vdbeSortAllocUnpacked(pTask);
         1117  +  if( rc!=SQLITE_OK ) return rc;
   998   1118   
   999   1119     aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
  1000   1120     if( !aSlot ){
  1001   1121       return SQLITE_NOMEM;
  1002   1122     }
  1003   1123   
  1004         -  p = pTask->pList;
         1124  +  p = pList->pList;
  1005   1125     while( p ){
  1006   1126       SorterRecord *pNext;
  1007         -    if( pTask->aListMemory ){
  1008         -      if( (u8*)p==pTask->aListMemory ){
         1127  +    if( pList->aMemory ){
         1128  +      if( (u8*)p==pList->aMemory ){
  1009   1129           pNext = 0;
  1010   1130         }else{
  1011         -        assert( p->u.iNext<sqlite3MallocSize(pTask->aListMemory) );
  1012         -        pNext = (SorterRecord*)&pTask->aListMemory[p->u.iNext];
         1131  +        assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
         1132  +        pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
  1013   1133         }
  1014   1134       }else{
  1015   1135         pNext = p->u.pNext;
  1016   1136       }
  1017   1137   
  1018   1138       p->u.pNext = 0;
  1019   1139       for(i=0; aSlot[i]; i++){
................................................................................
  1024   1144       p = pNext;
  1025   1145     }
  1026   1146   
  1027   1147     p = 0;
  1028   1148     for(i=0; i<64; i++){
  1029   1149       vdbeSorterMerge(pTask, p, aSlot[i], &p);
  1030   1150     }
  1031         -  pTask->pList = p;
         1151  +  pList->pList = p;
  1032   1152   
  1033   1153     sqlite3_free(aSlot);
         1154  +  if( pTask->pUnpacked->errCode ){
         1155  +    assert( pTask->pUnpacked->errCode==SQLITE_NOMEM );
         1156  +    return SQLITE_NOMEM;
         1157  +  }
  1034   1158     return SQLITE_OK;
  1035   1159   }
  1036   1160   
  1037   1161   /*
  1038   1162   ** Initialize a PMA-writer object.
  1039   1163   */
  1040   1164   static void vdbePmaWriterInit(
................................................................................
  1140   1264   }
  1141   1265   #else
  1142   1266   # define vdbeSorterExtendFile(x,y,z) SQLITE_OK
  1143   1267   #endif
  1144   1268   
  1145   1269   
  1146   1270   /*
  1147         -** Write the current contents of the in-memory linked-list to a PMA. Return
  1148         -** SQLITE_OK if successful, or an SQLite error code otherwise.
         1271  +** Write the current contents of in-memory linked-list pList to a level-0
         1272  +** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if 
         1273  +** successful, or an SQLite error code otherwise.
  1149   1274   **
  1150   1275   ** The format of a PMA is:
  1151   1276   **
  1152   1277   **     * A varint. This varint contains the total number of bytes of content
  1153   1278   **       in the PMA (not including the varint itself).
  1154   1279   **
  1155   1280   **     * One or more records packed end-to-end in order of ascending keys. 
  1156   1281   **       Each record consists of a varint followed by a blob of data (the 
  1157   1282   **       key). The varint is the number of bytes in the blob of data.
  1158   1283   */
  1159         -static int vdbeSorterListToPMA(SortSubtask *pTask){
         1284  +static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
  1160   1285     int rc = SQLITE_OK;             /* Return code */
  1161   1286     PmaWriter writer;               /* Object used to write to the file */
  1162   1287   
         1288  +#ifdef SQLITE_DEBUG
         1289  +  /* Set iSz to the expected size of file pTask->file after writing the PMA. 
         1290  +  ** This is used by an assert() statement at the end of this function.  */
         1291  +  i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
         1292  +#endif
         1293  +
         1294  +  vdbeSorterWorkDebug(pTask, "enter");
  1163   1295     memset(&writer, 0, sizeof(PmaWriter));
  1164         -  assert( pTask->nInMemory>0 );
         1296  +  assert( pList->szPMA>0 );
  1165   1297   
  1166   1298     /* If the first temporary PMA file has not been opened, open it now. */
  1167   1299     if( pTask->file.pFd==0 ){
  1168   1300       rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file.pFd);
  1169   1301       assert( rc!=SQLITE_OK || pTask->file.pFd );
  1170   1302       assert( pTask->file.iEof==0 );
  1171   1303       assert( pTask->nPMA==0 );
  1172   1304     }
  1173   1305   
  1174   1306     /* Try to get the file to memory map */
  1175   1307     if( rc==SQLITE_OK ){
  1176   1308       vdbeSorterExtendFile(pTask->db, 
  1177         -        pTask->file.pFd, pTask->file.iEof + pTask->nInMemory + 9
         1309  +        pTask->file.pFd, pTask->file.iEof + pList->szPMA + 9
  1178   1310       );
  1179   1311     }
         1312  +
         1313  +  /* Sort the list */
         1314  +  if( rc==SQLITE_OK ){
         1315  +    rc = vdbeSorterSort(pTask, pList);
         1316  +  }
  1180   1317   
  1181   1318     if( rc==SQLITE_OK ){
  1182   1319       SorterRecord *p;
  1183   1320       SorterRecord *pNext = 0;
  1184   1321   
  1185   1322       vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz,
  1186   1323                         pTask->file.iEof);
  1187   1324       pTask->nPMA++;
  1188         -    vdbePmaWriteVarint(&writer, pTask->nInMemory);
  1189         -    for(p=pTask->pList; p; p=pNext){
         1325  +    vdbePmaWriteVarint(&writer, pList->szPMA);
         1326  +    for(p=pList->pList; p; p=pNext){
  1190   1327         pNext = p->u.pNext;
  1191   1328         vdbePmaWriteVarint(&writer, p->nVal);
  1192   1329         vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
  1193         -      if( pTask->aListMemory==0 ) sqlite3_free(p);
         1330  +      if( pList->aMemory==0 ) sqlite3_free(p);
  1194   1331       }
  1195         -    pTask->pList = p;
         1332  +    pList->pList = p;
  1196   1333       rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
  1197   1334     }
  1198   1335   
  1199         -  assert( pTask->pList==0 || rc!=SQLITE_OK );
         1336  +  vdbeSorterWorkDebug(pTask, "exit");
         1337  +  assert( rc!=SQLITE_OK || pList->pList==0 );
         1338  +  assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
  1200   1339     return rc;
  1201   1340   }
  1202   1341   
  1203   1342   /*
  1204   1343   ** Advance the MergeEngine iterator passed as the second argument to
  1205   1344   ** the next entry. Set *pbEof to true if this means the iterator has 
  1206   1345   ** reached EOF.
................................................................................
  1271   1410       }
  1272   1411       *pbEof = (pMerger->aIter[pMerger->aTree[1]].pFile==0);
  1273   1412     }
  1274   1413   
  1275   1414     return rc;
  1276   1415   }
  1277   1416   
  1278         -#if 0
  1279         -static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
  1280         -  i64 t;
  1281         -  int iTask = (pTask - pTask->pSorter->aTask);
  1282         -  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
  1283         -  fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
  1284         -}
  1285         -static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
  1286         -  i64 t;
  1287         -  sqlite3OsCurrentTimeInt64(db->pVfs, &t);
  1288         -  fprintf(stderr, "%lld:X %s\n", t, zEvent);
  1289         -}
  1290         -static void vdbeSorterPopulateDebug(
  1291         -  SortSubtask *pTask,
  1292         -  const char *zEvent
  1293         -){
  1294         -  i64 t;
  1295         -  int iTask = (pTask - pTask->pSorter->aTask);
  1296         -  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
  1297         -  fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
  1298         -}
  1299         -#else
  1300         -# define vdbeSorterWorkDebug(x,y)
  1301         -# define vdbeSorterRewindDebug(x,y)
  1302         -# define vdbeSorterPopulateDebug(x,y)
  1303         -#endif
  1304         -
  1305         -static int vdbeSortAllocUnpacked(SortSubtask *pTask){
  1306         -  if( pTask->pUnpacked==0 ){
  1307         -    char *pFree;
  1308         -    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
  1309         -        pTask->pKeyInfo, 0, 0, &pFree
  1310         -    );
  1311         -    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
  1312         -    if( pFree==0 ) return SQLITE_NOMEM;
  1313         -    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
  1314         -    pTask->pUnpacked->errCode = 0;
  1315         -  }
  1316         -  return SQLITE_OK;
  1317         -}
  1318         -
  1319   1417   /*
  1320   1418   ** The main routine for sorter-thread operations.
  1321   1419   */
  1322         -static void *vdbeSortSubtaskMain(void *pCtx){
  1323         -  int rc = SQLITE_OK;
         1420  +static void *vdbeSorterFlushThread(void *pCtx){
  1324   1421     SortSubtask *pTask = (SortSubtask*)pCtx;
  1325         -
  1326         -  assert( pTask->eWork==SORT_SUBTASK_SORT
  1327         -       || pTask->eWork==SORT_SUBTASK_TO_PMA
  1328         -       || pTask->eWork==SORT_SUBTASK_CONS
  1329         -  );
  1330         -  assert( pTask->bDone==0 );
  1331         -
  1332         -  vdbeSorterWorkDebug(pTask, "enter");
  1333         -
  1334         -  rc = vdbeSortAllocUnpacked(pTask);
  1335         -  if( rc!=SQLITE_OK ) goto thread_out;
  1336         -
  1337         -  if( pTask->eWork==SORT_SUBTASK_CONS ){
  1338         -    assert( pTask->pList==0 );
  1339         -    while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){
  1340         -      int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT);
  1341         -      sqlite3_file *pTemp2 = 0;     /* Second temp file to use */
  1342         -      MergeEngine *pMerger;         /* Object for reading/merging PMA data */
  1343         -      i64 iReadOff = 0;             /* Offset in pTemp1 to read from */
  1344         -      i64 iWriteOff = 0;            /* Offset in pTemp2 to write to */
  1345         -      int i;
  1346         -      
  1347         -      /* Allocate a merger object to merge PMAs together. */
  1348         -      pMerger = vdbeMergeEngineNew(nIter);
  1349         -      if( pMerger==0 ){
  1350         -        rc = SQLITE_NOMEM;
  1351         -        break;
  1352         -      }
  1353         -
  1354         -      /* Open a second temp file to write merged data to */
  1355         -      rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2);
  1356         -      if( rc==SQLITE_OK ){
  1357         -        vdbeSorterExtendFile(pTask->db, pTemp2, pTask->file.iEof);
  1358         -      }else{
  1359         -        vdbeMergeEngineFree(pMerger);
  1360         -        break;
  1361         -      }
  1362         -
  1363         -      /* This loop runs once for each output PMA. Each output PMA is made
  1364         -      ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */
  1365         -      for(i=0; rc==SQLITE_OK && i<pTask->nPMA; i+=SORTER_MAX_MERGE_COUNT){
  1366         -        PmaWriter writer;         /* Object for writing data to pTemp2 */
  1367         -        i64 nOut = 0;             /* Bytes of data in output PMA */
  1368         -        int bEof = 0;
  1369         -        int rc2;
  1370         -
  1371         -        /* Configure the merger object to read and merge data from the next 
  1372         -        ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs,
  1373         -        ** if that is fewer). */
  1374         -        int iIter;
  1375         -        for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
  1376         -          PmaReader *pIter = &pMerger->aIter[iIter];
  1377         -          rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nOut);
  1378         -          iReadOff = pIter->iEof;
  1379         -          if( iReadOff>=pTask->file.iEof || rc!=SQLITE_OK ) break;
  1380         -        }
  1381         -        for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
  1382         -          rc = vdbeSorterDoCompare(pTask, pMerger, iIter);
  1383         -        }
  1384         -
  1385         -        vdbePmaWriterInit(pTemp2, &writer, pTask->pgsz, iWriteOff);
  1386         -        vdbePmaWriteVarint(&writer, nOut);
  1387         -        while( rc==SQLITE_OK && bEof==0 ){
  1388         -          PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
  1389         -          assert( pIter->pFile!=0 );        /* pIter is not at EOF */
  1390         -          vdbePmaWriteVarint(&writer, pIter->nKey);
  1391         -          vdbePmaWriteBlob(&writer, pIter->aKey, pIter->nKey);
  1392         -          rc = vdbeSorterNext(pTask, pMerger, &bEof);
  1393         -        }
  1394         -        rc2 = vdbePmaWriterFinish(&writer, &iWriteOff);
  1395         -        if( rc==SQLITE_OK ) rc = rc2;
  1396         -      }
  1397         -
  1398         -      vdbeMergeEngineFree(pMerger);
  1399         -      sqlite3OsCloseFree(pTask->file.pFd);
  1400         -      pTask->file.pFd = pTemp2;
  1401         -      pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT);
  1402         -      pTask->file.iEof = iWriteOff;
  1403         -    }
  1404         -  }else{
  1405         -    /* Sort the pTask->pList list */
  1406         -    rc = vdbeSorterSort(pTask);
  1407         -
  1408         -    /* If required, write the list out to a PMA. */
  1409         -    if( rc==SQLITE_OK && pTask->eWork==SORT_SUBTASK_TO_PMA ){
  1410         -#ifdef SQLITE_DEBUG
  1411         -      i64 nExpect = pTask->nInMemory
  1412         -        + sqlite3VarintLen(pTask->nInMemory)
  1413         -        + pTask->file.iEof;
  1414         -#endif
  1415         -      rc = vdbeSorterListToPMA(pTask);
  1416         -      assert( rc!=SQLITE_OK || (nExpect==pTask->file.iEof) );
  1417         -    }
  1418         -  }
  1419         -
  1420         - thread_out:
  1421         -  pTask->bDone = 1;
  1422         -  if( rc==SQLITE_OK && pTask->pUnpacked->errCode ){
  1423         -    assert( pTask->pUnpacked->errCode==SQLITE_NOMEM );
  1424         -    rc = SQLITE_NOMEM;
  1425         -  }
  1426         -  vdbeSorterWorkDebug(pTask, "exit");
         1422  +  int rc;                         /* Return code */
         1423  +  assert( pTask->thread.bDone==0 );
         1424  +  rc = vdbeSorterListToPMA(pTask, &pTask->list);
         1425  +  pTask->thread.bDone = 1;
  1427   1426     return SQLITE_INT_TO_PTR(rc);
  1428   1427   }
  1429   1428   
  1430   1429   /*
  1431         -** Run the activity scheduled by the object passed as the only argument
  1432         -** in the current thread.
         1430  +** Flush the current contents of VdbeSorter.list to a new PMA, possibly
         1431  +** using a background thread.
  1433   1432   */
  1434         -static int vdbeSorterRunTask(SortSubtask *pTask){
  1435         -  int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pTask) );
  1436         -  assert( pTask->bDone );
  1437         -  pTask->bDone = 0;
  1438         -  return rc;
  1439         -}
  1440         -
  1441         -/*
  1442         -** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly
  1443         -** using a background thread.
  1444         -**
  1445         -** If argument bFg is non-zero, the operation always uses the calling thread.
  1446         -*/
  1447         -static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
  1448         -  VdbeSorter *pSorter = pCsr->pSorter;
         1433  +static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
         1434  +#if SQLITE_MAX_WORKER_THREADS==0
         1435  +  pSorter->bUsePMA = 1;
         1436  +  return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
         1437  +#else
  1449   1438     int rc = SQLITE_OK;
  1450   1439     int i;
  1451   1440     SortSubtask *pTask = 0;    /* Thread context used to create new PMA */
  1452   1441     int nWorker = (pSorter->nTask-1);
  1453   1442   
         1443  +  /* Set the flag to indicate that at least one PMA has been written. 
         1444  +  ** Or will be, anyhow.  */
  1454   1445     pSorter->bUsePMA = 1;
         1446  +
         1447  +  /* Select a sub-task to sort and flush the current list of in-memory
         1448  +  ** records to disk. If the sorter is running in multi-threaded mode,
         1449  +  ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
         1450  +  ** the background thread from a sub-tasks previous turn is still running,
         1451  +  ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
         1452  +  ** fall back to using the final sub-task. The first (pSorter->nTask-1)
         1453  +  ** sub-tasks are prefered as they use background threads - the final 
         1454  +  ** sub-task uses the main thread. */
  1455   1455     for(i=0; i<nWorker; i++){
  1456   1456       int iTest = (pSorter->iPrev + i + 1) % nWorker;
  1457   1457       pTask = &pSorter->aTask[iTest];
  1458         -#if SQLITE_MAX_WORKER_THREADS>0
  1459         -    if( pTask->bDone ){
  1460         -      void *pRet;
  1461         -      assert( pTask->pThread );
  1462         -      rc = sqlite3ThreadJoin(pTask->pThread, &pRet);
  1463         -      pTask->pThread = 0;
  1464         -      pTask->bDone = 0;
  1465         -      if( rc==SQLITE_OK ){
  1466         -        rc = SQLITE_PTR_TO_INT(pRet);
  1467         -      }
         1458  +    if( pTask->thread.bDone ){
         1459  +      rc = vdbeSorterJoinThread(pTask, &pTask->thread);
  1468   1460       }
  1469         -#endif
  1470         -    if( pTask->pThread==0 ) break;
  1471         -    pTask = 0;
         1461  +    if( pTask->thread.pThread==0 || rc!=SQLITE_OK ) break;
  1472   1462     }
  1473         -  if( pTask==0 ){
  1474         -    pTask = &pSorter->aTask[nWorker];
  1475         -  }
  1476         -  pSorter->iPrev = (pTask - pSorter->aTask);
  1477   1463   
  1478   1464     if( rc==SQLITE_OK ){
  1479         -    assert( pTask->pThread==0 && pTask->bDone==0 );
  1480         -    pTask->eWork = SORT_SUBTASK_TO_PMA;
  1481         -    pTask->pList = pSorter->pRecord;
  1482         -    pTask->nInMemory = pSorter->nInMemory;
  1483         -    pSorter->nInMemory = 0;
  1484         -    pSorter->pRecord = 0;
  1485         -
  1486         -    if( pSorter->aMemory ){
  1487         -      u8 *aMem = pTask->aListMemory;
  1488         -      pTask->aListMemory = pSorter->aMemory;
  1489         -      pSorter->aMemory = aMem;
  1490         -    }
  1491         -
  1492         -#if SQLITE_MAX_WORKER_THREADS>0
  1493         -    if( !bFg && pTask!=&pSorter->aTask[nWorker] ){
         1465  +    if( i==nWorker ){
         1466  +      /* Use the foreground thread for this operation */
         1467  +      rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
         1468  +    }else{
  1494   1469         /* Launch a background thread for this operation */
         1470  +      u8 *aMem = pTask->list.aMemory;
  1495   1471         void *pCtx = (void*)pTask;
  1496         -      assert( pSorter->aMemory==0 || pTask->aListMemory!=0 );
  1497         -      if( pTask->aListMemory ){
  1498         -        if( pSorter->aMemory==0 ){
  1499         -          pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
  1500         -          if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
  1501         -        }else{
  1502         -          pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory);
  1503         -        }
         1472  +
         1473  +      assert( pTask->thread.pThread==0 && pTask->thread.bDone==0 );
         1474  +      assert( pTask->list.pList==0 );
         1475  +      assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
         1476  +
         1477  +      pSorter->iPrev = (pTask - pSorter->aTask);
         1478  +      pTask->list = pSorter->list;
         1479  +      pSorter->list.pList = 0;
         1480  +      pSorter->list.szPMA = 0;
         1481  +      if( aMem ){
         1482  +        pSorter->list.aMemory = aMem;
         1483  +        pSorter->nMemory = sqlite3MallocSize(aMem);
         1484  +      }else{
         1485  +        pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
         1486  +        if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
  1504   1487         }
  1505         -      rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx);
  1506         -    }else
  1507         -#endif
  1508         -    {
  1509         -      /* Use the foreground thread for this operation */
  1510         -      rc = vdbeSorterRunTask(pTask);
  1511         -      if( rc==SQLITE_OK ){
  1512         -        u8 *aMem = pTask->aListMemory;
  1513         -        pTask->aListMemory = pSorter->aMemory;
  1514         -        pSorter->aMemory = aMem;
  1515         -        assert( pTask->pList==0 );
  1516         -      }
         1488  +
         1489  +      rc = vdbeSorterCreateThread(&pTask->thread, vdbeSorterFlushThread, pCtx);
  1517   1490       }
  1518   1491     }
  1519   1492   
  1520   1493     return rc;
         1494  +#endif
  1521   1495   }
  1522   1496   
  1523   1497   /*
  1524   1498   ** Add a record to the sorter.
  1525   1499   */
  1526   1500   int sqlite3VdbeSorterWrite(
  1527   1501     sqlite3 *db,                    /* Database handle */
................................................................................
  1553   1527     **
  1554   1528     **   * The total memory allocated for the in-memory list is greater 
  1555   1529     **     than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
  1556   1530     */
  1557   1531     nReq = pVal->n + sizeof(SorterRecord);
  1558   1532     nPMA = pVal->n + sqlite3VarintLen(pVal->n);
  1559   1533     if( pSorter->mxPmaSize ){
  1560         -    if( pSorter->aMemory ){
         1534  +    if( pSorter->list.aMemory ){
  1561   1535         bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
  1562   1536       }else{
  1563   1537         bFlush = (
  1564         -          (pSorter->nInMemory > pSorter->mxPmaSize)
  1565         -       || (pSorter->nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
         1538  +          (pSorter->list.szPMA > pSorter->mxPmaSize)
         1539  +       || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
  1566   1540         );
  1567   1541       }
  1568   1542       if( bFlush ){
  1569         -      rc = vdbeSorterFlushPMA(db, pCsr, 0);
  1570         -      pSorter->nInMemory = 0;
         1543  +      rc = vdbeSorterFlushPMA(pSorter);
         1544  +      pSorter->list.szPMA = 0;
  1571   1545         pSorter->iMemory = 0;
  1572         -      assert( rc!=SQLITE_OK || pSorter->pRecord==0 );
         1546  +      assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
  1573   1547       }
  1574   1548     }
  1575   1549   
  1576         -  pSorter->nInMemory += nPMA;
         1550  +  pSorter->list.szPMA += nPMA;
  1577   1551     if( nPMA>pSorter->mxKeysize ){
  1578   1552       pSorter->mxKeysize = nPMA;
  1579   1553     }
  1580   1554   
  1581         -  if( pSorter->aMemory ){
         1555  +  if( pSorter->list.aMemory ){
  1582   1556       int nMin = pSorter->iMemory + nReq;
  1583   1557   
  1584   1558       if( nMin>pSorter->nMemory ){
  1585   1559         u8 *aNew;
  1586   1560         int nNew = pSorter->nMemory * 2;
  1587   1561         while( nNew < nMin ) nNew = nNew*2;
  1588   1562         if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
  1589   1563         if( nNew < nMin ) nNew = nMin;
  1590   1564   
  1591         -      aNew = sqlite3Realloc(pSorter->aMemory, nNew);
         1565  +      aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
  1592   1566         if( !aNew ) return SQLITE_NOMEM;
  1593         -      pSorter->pRecord = (SorterRecord*)(
  1594         -          aNew + ((u8*)pSorter->pRecord - pSorter->aMemory)
         1567  +      pSorter->list.pList = (SorterRecord*)(
         1568  +          aNew + ((u8*)pSorter->list.pList - pSorter->list.aMemory)
  1595   1569         );
  1596         -      pSorter->aMemory = aNew;
         1570  +      pSorter->list.aMemory = aNew;
  1597   1571         pSorter->nMemory = nNew;
  1598   1572       }
  1599   1573   
  1600         -    pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory];
         1574  +    pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
  1601   1575       pSorter->iMemory += ROUND8(nReq);
  1602         -    pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory;
         1576  +    pNew->u.iNext = (u8*)(pSorter->list.pList) - pSorter->list.aMemory;
  1603   1577     }else{
  1604   1578       pNew = (SorterRecord *)sqlite3Malloc(nReq);
  1605   1579       if( pNew==0 ){
  1606   1580         return SQLITE_NOMEM;
  1607   1581       }
  1608         -    pNew->u.pNext = pSorter->pRecord;
         1582  +    pNew->u.pNext = pSorter->list.pList;
  1609   1583     }
  1610   1584   
  1611   1585     memcpy(SRVAL(pNew), pVal->z, pVal->n);
  1612   1586     pNew->nVal = pVal->n;
  1613         -  pSorter->pRecord = pNew;
         1587  +  pSorter->list.pList = pNew;
  1614   1588   
  1615   1589     return rc;
  1616   1590   }
  1617   1591   
  1618         -/*
  1619         -** Return the total number of PMAs in all temporary files.
  1620         -*/
  1621         -static int vdbeSorterCountPMA(VdbeSorter *pSorter){
  1622         -  int nPMA = 0;
  1623         -  int i;
  1624         -  for(i=0; i<pSorter->nTask; i++){
  1625         -    nPMA += pSorter->aTask[i].nPMA;
  1626         -  }
  1627         -  return nPMA;
  1628         -}
  1629         -
  1630   1592   /*
  1631   1593   ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
  1632   1594   ** of the data stored in aFile[1] is the same as that used by regular PMAs,
  1633   1595   ** except that the number-of-bytes varint is omitted from the start.
  1634   1596   */
  1635   1597   static int vdbeIncrPopulate(IncrMerger *pIncr){
  1636   1598     int rc = SQLITE_OK;
................................................................................
  1663   1625   
  1664   1626     rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
  1665   1627     if( rc==SQLITE_OK ) rc = rc2;
  1666   1628     vdbeSorterPopulateDebug(pIncr->pTask, "exit");
  1667   1629     return rc;
  1668   1630   }
  1669   1631   
  1670         -static void *vdbeIncrPopulateThreadMain(void *pCtx){
         1632  +static void *vdbeIncrPopulateThread(void *pCtx){
  1671   1633     IncrMerger *pIncr = (IncrMerger*)pCtx;
  1672         -  return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
         1634  +  void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
         1635  +  pIncr->thread.bDone = 1;
         1636  +  return pRet;
  1673   1637   }
  1674   1638   
  1675         -static int vdbeIncrBgPopulate(IncrMerger *pIncr){
  1676         -  int rc;
  1677         -  assert( pIncr->pThread==0 );
  1678         -  if( pIncr->bUseThread==0 ){
  1679         -    rc = vdbeIncrPopulate(pIncr);
  1680         -  }
  1681   1639   #if SQLITE_MAX_WORKER_THREADS>0
  1682         -  else{
  1683         -    void *pCtx = (void*)pIncr;
  1684         -    rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
  1685         -  }
         1640  +static int vdbeIncrBgPopulate(IncrMerger *pIncr){
         1641  +  void *pCtx = (void*)pIncr;
         1642  +  assert( pIncr->bUseThread );
         1643  +  return vdbeSorterCreateThread(&pIncr->thread, vdbeIncrPopulateThread, pCtx);
         1644  +}
  1686   1645   #endif
  1687         -  return rc;
  1688         -}
  1689   1646   
  1690   1647   static int vdbeIncrSwap(IncrMerger *pIncr){
  1691   1648     int rc = SQLITE_OK;
  1692   1649   
  1693         -  if( pIncr->bUseThread ){
  1694   1650   #if SQLITE_MAX_WORKER_THREADS>0
  1695         -    if( pIncr->pThread ){
  1696         -      void *pRet;
  1697         -      assert( pIncr->bUseThread );
  1698         -      rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
  1699         -      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
  1700         -      pIncr->pThread = 0;
  1701         -    }
  1702         -#endif
         1651  +  if( pIncr->bUseThread ){
         1652  +    rc = vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
  1703   1653   
  1704   1654       if( rc==SQLITE_OK ){
  1705   1655         SorterFile f0 = pIncr->aFile[0];
  1706   1656         pIncr->aFile[0] = pIncr->aFile[1];
  1707   1657         pIncr->aFile[1] = f0;
  1708   1658       }
  1709   1659   
................................................................................
  1710   1660       if( rc==SQLITE_OK ){
  1711   1661         if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
  1712   1662           pIncr->bEof = 1;
  1713   1663         }else{
  1714   1664           rc = vdbeIncrBgPopulate(pIncr);
  1715   1665         }
  1716   1666       }
  1717         -  }else{
         1667  +  }else
         1668  +#endif
         1669  +  {
  1718   1670       rc = vdbeIncrPopulate(pIncr);
  1719   1671       pIncr->aFile[0] = pIncr->aFile[1];
  1720   1672       if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
  1721   1673         pIncr->bEof = 1;
  1722   1674       }
  1723   1675     }
  1724   1676   
  1725   1677     return rc;
  1726   1678   }
  1727   1679   
  1728   1680   static void vdbeIncrFree(IncrMerger *pIncr){
  1729   1681     if( pIncr ){
  1730   1682   #if SQLITE_MAX_WORKER_THREADS>0
  1731         -    if( pIncr->pThread ){
  1732         -      void *pRet;
  1733         -      sqlite3ThreadJoin(pIncr->pThread, &pRet);
  1734         -    }
         1683  +    vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
  1735   1684       if( pIncr->bUseThread ){
  1736   1685         if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
  1737   1686         if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
  1738   1687       }
  1739   1688   #endif
  1740   1689       vdbeMergeEngineFree(pIncr->pMerger);
  1741   1690       sqlite3_free(pIncr);
................................................................................
  1746   1695     IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
  1747   1696     if( pIncr ){
  1748   1697       memset(pIncr, 0, sizeof(IncrMerger));
  1749   1698       pIncr->pMerger = pMerger;
  1750   1699       pIncr->pTask = pTask;
  1751   1700       pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
  1752   1701       pTask->file2.iEof += pIncr->mxSz;
  1753         -
  1754         -#if 0
  1755         -    /* Open the two temp files. */
  1756         -    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
  1757         -    if( rc==SQLITE_OK ){
  1758         -      rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
  1759         -    }
  1760         -    if( rc!=SQLITE_OK ){
  1761         -      vdbeIncrFree(pIncr);
  1762         -      pIncr = 0;
  1763         -    }
  1764         -#endif
  1765   1702     }
  1766   1703     return pIncr;
  1767   1704   }
  1768   1705   
  1769   1706   static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
  1770   1707     if( bUseThread ){
  1771   1708       pIncr->bUseThread = 1;
................................................................................
  1780   1717       SortSubtask *pTask = pIncr->pTask;
  1781   1718       int i;
  1782   1719       MergeEngine *pMerger = pIncr->pMerger;
  1783   1720   
  1784   1721       for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
  1785   1722         rc = vdbeIncrInit2(&pMerger->aIter[i]);
  1786   1723       }
  1787         -    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
  1788         -      rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
  1789         -    }
  1790   1724   
  1791   1725       /* Set up the required files for pIncr */
  1792   1726       if( rc==SQLITE_OK ){
  1793   1727         if( pIncr->bUseThread==0 ){
  1794   1728           if( pTask->file2.pFd==0 ){
  1795   1729             rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
  1796   1730             assert( pTask->file2.iEof>0 );
................................................................................
  1807   1741         }else{
  1808   1742           rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
  1809   1743           if( rc==SQLITE_OK ){
  1810   1744             rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
  1811   1745           }
  1812   1746         }
  1813   1747       }
         1748  +
         1749  +    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
         1750  +      rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
         1751  +    }
  1814   1752   
  1815   1753       if( rc==SQLITE_OK && pIncr->bUseThread ){
  1816   1754         rc = vdbeIncrBgPopulate(pIncr);
  1817   1755       }
  1818   1756   
  1819   1757       if( rc==SQLITE_OK ){
  1820   1758         rc = vdbePmaReaderNext(pIter);
................................................................................
  1994   1932               vdbeIncrSetThreads(pIncr, pSorter->bUseThreads);
  1995   1933               assert( pIncr->pTask!=pLast );
  1996   1934             }
  1997   1935           }
  1998   1936         }
  1999   1937       }
  2000   1938     }
  2001         -  if( rc==SQLITE_OK ){
  2002         -    rc = vdbeIncrInit2(pIter);
  2003         -  }
         1939  +  if( rc==SQLITE_OK ) rc = vdbeIncrInit2(pIter);
  2004   1940   
  2005   1941     sqlite3_free(aMerge);
  2006   1942     return rc;
  2007   1943   }
  2008   1944   
  2009   1945   
  2010   1946   /*
................................................................................
  2018   1954   
  2019   1955     assert( pSorter );
  2020   1956   
  2021   1957     /* If no data has been written to disk, then do not do so now. Instead,
  2022   1958     ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
  2023   1959     ** from the in-memory list.  */
  2024   1960     if( pSorter->bUsePMA==0 ){
  2025         -    if( pSorter->pRecord ){
  2026         -      SortSubtask *pTask = &pSorter->aTask[0];
         1961  +    if( pSorter->list.pList ){
  2027   1962         *pbEof = 0;
  2028         -      pTask->pList = pSorter->pRecord;
  2029         -      pTask->eWork = SORT_SUBTASK_SORT;
  2030         -      assert( pTask->aListMemory==0 );
  2031         -      pTask->aListMemory = pSorter->aMemory;
  2032         -      rc = vdbeSorterRunTask(pTask);
  2033         -      pTask->aListMemory = 0;
  2034         -      pSorter->pRecord = pTask->pList;
  2035         -      pTask->pList = 0;
         1963  +      rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
  2036   1964       }else{
  2037   1965         *pbEof = 1;
  2038   1966       }
  2039   1967       return rc;
  2040   1968     }
  2041   1969   
  2042   1970     /* Write the current in-memory list to a PMA. */
  2043         -  if( pSorter->pRecord ){
  2044         -    rc = vdbeSorterFlushPMA(db, pCsr, 1);
         1971  +  if( pSorter->list.pList ){
         1972  +    rc = vdbeSorterFlushPMA(pSorter);
  2045   1973     }
  2046   1974   
  2047   1975     /* Join all threads */
  2048   1976     rc = vdbeSorterJoinAll(pSorter, rc);
  2049   1977   
  2050   1978     vdbeSorterRewindDebug(db, "rewind");
  2051   1979   
................................................................................
  2072   2000     VdbeSorter *pSorter = pCsr->pSorter;
  2073   2001     int rc;                         /* Return code */
  2074   2002   
  2075   2003     if( pSorter->pReader ){
  2076   2004       rc = vdbePmaReaderNext(pSorter->pReader);
  2077   2005       *pbEof = (pSorter->pReader->pFile==0);
  2078   2006     }else{
  2079         -    SorterRecord *pFree = pSorter->pRecord;
  2080         -    pSorter->pRecord = pFree->u.pNext;
         2007  +    SorterRecord *pFree = pSorter->list.pList;
         2008  +    pSorter->list.pList = pFree->u.pNext;
  2081   2009       pFree->u.pNext = 0;
  2082         -    if( pSorter->aMemory==0 ) vdbeSorterRecordFree(db, pFree);
  2083         -    *pbEof = !pSorter->pRecord;
         2010  +    if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
         2011  +    *pbEof = !pSorter->list.pList;
  2084   2012       rc = SQLITE_OK;
  2085   2013     }
  2086   2014     return rc;
  2087   2015   }
  2088   2016   
  2089   2017   /*
  2090   2018   ** Return a pointer to a buffer owned by the sorter that contains the 
................................................................................
  2095   2023     int *pnKey                      /* OUT: Size of current key in bytes */
  2096   2024   ){
  2097   2025     void *pKey;
  2098   2026     if( pSorter->pReader ){
  2099   2027       *pnKey = pSorter->pReader->nKey;
  2100   2028       pKey = pSorter->pReader->aKey;
  2101   2029     }else{
  2102         -    *pnKey = pSorter->pRecord->nVal;
  2103         -    pKey = SRVAL(pSorter->pRecord);
         2030  +    *pnKey = pSorter->list.pList->nVal;
         2031  +    pKey = SRVAL(pSorter->list.pList);
  2104   2032     }
  2105   2033     return pKey;
  2106   2034   }
  2107   2035   
  2108   2036   /*
  2109   2037   ** Copy the current sorter key into the memory cell pOut.
  2110   2038   */