/ Check-in [cbfc0f6d]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Experimental multi-threaded sorting changes to begin merging PMAs before SorterRewind() is called.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | abandoned
Files: files | file ages | folders
SHA1:cbfc0f6d49b6607bb4eb45bfe4c258e39ac27403
User & Date: dan 2014-04-07 18:44:00
Context
2014-04-07
18:44
Experimental multi-threaded sorting changes to begin merging PMAs before SorterRewind() is called. Closed-Leaf check-in: cbfc0f6d user: dan tags: abandoned
2014-04-04
22:44
Fix harmless compiler warnings. check-in: e54dded2 user: drh tags: threads
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/shell.c.

  3531   3531     memcpy(data->separator,"|", 2);
  3532   3532     data->showHeader = 0;
  3533   3533     sqlite3_config(SQLITE_CONFIG_URI, 1);
  3534   3534     sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data);
  3535   3535     sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> ");
  3536   3536     sqlite3_snprintf(sizeof(continuePrompt), continuePrompt,"   ...> ");
  3537   3537     sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
  3538         -  sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 3);
         3538  +  sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 4);
  3539   3539   }
  3540   3540   
  3541   3541   /*
  3542   3542   ** Output text to the console in a font that attracts extra attention.
  3543   3543   */
  3544   3544   #ifdef _WIN32
  3545   3545   static void printBold(const char *zText){

Changes to src/vdbesort.c.

    93     93   */
    94     94   typedef struct MergeEngine MergeEngine;     /* Merge PMAs together */
    95     95   typedef struct PmaReader PmaReader;         /* Incrementally read one PMA */
    96     96   typedef struct PmaWriter PmaWriter;         /* Incrementally write on PMA */
    97     97   typedef struct SorterRecord SorterRecord;   /* A record being sorted */
    98     98   typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
    99     99   
          100  +typedef struct SortList SortList;
          101  +typedef struct SortFile SortFile;
          102  +typedef struct SortLevel SortLevel;
          103  +
   100    104   
   101    105   /*
   102         -** Candidate values for SortSubtask.eWork
          106  +** A file containing zero or more PMAs.
   103    107   */
   104         -#define SORT_SUBTASK_SORT   1     /* Sort records on pList */
   105         -#define SORT_SUBTASK_TO_PMA 2     /* Xfer pList to Packed-Memory-Array pTemp1 */
   106         -#define SORT_SUBTASK_CONS   3     /* Consolidate multiple PMAs */
          108  +struct SortFile {
          109  +  sqlite3_file *pFd;              /* File handle */
          110  +  i64 iOff;                       /* Current write offset */
          111  +  i64 nByte;                      /* Actual size of file */
          112  +  int nPMA;                       /* Number of PMA currently in file */
          113  +};
          114  +
          115  +/*
          116  +** A list of records.
          117  +*/
          118  +struct SortList {
          119  +  SorterRecord *pRecord;          /* List of records for pTask to sort */
          120  +  int nInMemory;                  /* Expected size of PMA based on pList */
          121  +  u8 *aMemory;                    /* Records memory (or NULL) */
          122  +};
          123  +
          124  +struct SortLevel {
          125  +  SortSubtask *pTask;             /* Sorter task this level is a part of */
          126  +  SQLiteThread *pThread;          /* Thread handle, or NULL */
          127  +  int bDone;                      /* Set to true by pThread when finished */
          128  +  union {
          129  +    SortFile f;                   /* Input for level 1 and greater */
          130  +    SortList l;                   /* Input for level 0 */
          131  +  } in;
          132  +  SortFile out;                   /* Level storage */
          133  +  SortLevel *pNext;               /* Next level (containing larger PMAs) */
          134  +  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
          135  +};
   107    136   
   108    137   /*
   109    138   ** Sorting is divided up into smaller subtasks.  Each subtask is controlled
   110    139   ** by an instance of this object. A Subtask might run in either the main thread
   111    140   ** or in a background thread.
   112    141   **
   113    142   ** Exactly VdbeSorter.nTask instances of this object are allocated
................................................................................
   137    166   **     the temp file if it is not already open.
   138    167   **
   139    168   **   SORT_SUBTASK_CONS:
   140    169   **     Merge existing PMAs until SortSubtask.nConsolidate or fewer
   141    170   **     remain in temp file SortSubtask.pTemp1.
   142    171   */
   143    172   struct SortSubtask {
   144         -  SQLiteThread *pThread;          /* Thread handle, or NULL */
   145         -  int bDone;                      /* Set to true by pTask when finished */
   146         -
          173  +  int iId;                        /* Sub-task id */
   147    174     sqlite3 *db;                    /* Database connection */
          175  +  VdbeSorter *pSorter;            /* Sorter that owns this object */
   148    176     KeyInfo *pKeyInfo;              /* How to compare records */
   149         -  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
   150    177     int pgsz;                       /* Main database page size */
   151         -
   152         -  u8 eWork;                       /* One of the SORT_SUBTASK_* constants */
   153         -  int nConsolidate;               /* For SORT_SUBTASK_CONS, max final PMAs */
   154         -  SorterRecord *pList;            /* List of records for pTask to sort */
   155         -  int nInMemory;                  /* Expected size of PMA based on pList */
   156         -  u8 *aListMemory;                /* Records memory (or NULL) */
   157         -
   158         -  int nPMA;                       /* Number of PMAs currently in pTemp1 */
   159         -  i64 iTemp1Off;                  /* Offset to write to in pTemp1 */
   160         -  sqlite3_file *pTemp1;           /* File to write PMAs to, or NULL */
          178  +  int nConsolidate;               /* For consolidation, max final PMAs */
          179  +  SortLevel *pLevel;              /* PMA level 0 */
   161    180   };
   162    181   
   163    182   
   164    183   /*
   165    184   ** The MergeEngine object is used to combine two or more smaller PMAs into
   166    185   ** one big PMA using a merge operation.  Separate PMAs all need to be
   167    186   ** combined into one big PMA in order to be able to step through the sorted
................................................................................
   231    250   };
   232    251   
   233    252   /*
   234    253   ** Main sorter structure. A single instance of this is allocated for each 
   235    254   ** sorter cursor created by the VDBE.
   236    255   */
   237    256   struct VdbeSorter {
   238         -  int nInMemory;                  /* Current size of pRecord list as PMA */
   239    257     int mnPmaSize;                  /* Minimum PMA size, in bytes */
   240    258     int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
   241    259     int bUsePMA;                    /* True if one or more PMAs created */
   242         -  SorterRecord *pRecord;          /* Head of in-memory record list */
   243    260     MergeEngine *pMerger;           /* For final merge of PMAs (by caller) */ 
   244         -  u8 *aMemory;                    /* Block of memory to alloc records from */
   245         -  int iMemory;                    /* Offset of first free byte in aMemory */
   246         -  int nMemory;                    /* Size of aMemory allocation in bytes */
   247         -  int iPrev;                      /* Previous thread used to flush PMA */
          261  +  UnpackedRecord *pUnpacked;      /* Used by sqlite3VdbeSorterCompare */
          262  +  int iMemory;                    /* Offset of free byte in list.aMemory */
          263  +  int nMemory;                    /* Size of list.aMemory allocation in bytes */
          264  +  SortList list;                  /* In memory records */
          265  +  int iPrev;                      /* Previous PMA flushed via task iPrev */
   248    266     int nTask;                      /* Size of aTask[] array */
   249    267     SortSubtask aTask[1];           /* One or more subtasks */
   250    268   };
   251    269   
   252    270   /*
   253    271   ** An instance of the following object is used to read records out of a
   254    272   ** PMA, in sorted order.  The next key to be read is cached in nKey/aKey.
................................................................................
   492    510   ** Initialize iterator pIter to scan through the PMA stored in file pFile
   493    511   ** starting at offset iStart and ending at offset iEof-1. This function 
   494    512   ** leaves the iterator pointing to the first key in the PMA (or EOF if the 
   495    513   ** PMA is empty).
   496    514   */
   497    515   static int vdbePmaReaderInit(
   498    516     SortSubtask *pTask,             /* Thread context */
          517  +  SortFile *pFile,                /* File to read from */
   499    518     i64 iStart,                     /* Start offset in pTask->pTemp1 */
   500    519     PmaReader *pIter,               /* Iterator to populate */
   501    520     i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
   502    521   ){
   503    522     int rc = SQLITE_OK;
   504    523     int nBuf = pTask->pgsz;
   505    524     void *pMap = 0;                 /* Mapping of temp file */
   506    525   
   507         -  assert( pTask->iTemp1Off>iStart );
          526  +  assert( pFile->iOff>iStart );
   508    527     assert( pIter->aAlloc==0 );
   509    528     assert( pIter->aBuffer==0 );
   510         -  pIter->pFile = pTask->pTemp1;
          529  +  pIter->pFile = pFile->pFd;
   511    530     pIter->iReadOff = iStart;
   512    531     pIter->nAlloc = 128;
   513    532     pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
   514    533     if( pIter->aAlloc ){
   515    534       /* Try to xFetch() a mapping of the entire temp file. If this is possible,
   516    535       ** the PMA will be read via the mapping. Otherwise, use xRead().  */
   517         -    if( pTask->iTemp1Off<=(i64)(pTask->db->nMaxSorterMmap) ){
   518         -      rc = sqlite3OsFetch(pIter->pFile, 0, pTask->iTemp1Off, &pMap);
          536  +    if( pFile->iOff<=(i64)(pTask->db->nMaxSorterMmap) ){
          537  +      rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iOff, &pMap);
   519    538       }
   520    539     }else{
   521    540       rc = SQLITE_NOMEM;
   522    541     }
   523    542   
   524    543     if( rc==SQLITE_OK ){
   525    544       if( pMap ){
................................................................................
   529    548         pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
   530    549         if( !pIter->aBuffer ){
   531    550           rc = SQLITE_NOMEM;
   532    551         }else{
   533    552           int iBuf = iStart % nBuf;
   534    553           if( iBuf ){
   535    554             int nRead = nBuf - iBuf;
   536         -          if( (iStart + nRead) > pTask->iTemp1Off ){
   537         -            nRead = (int)(pTask->iTemp1Off - iStart);
          555  +          if( (iStart + nRead) > pFile->iOff ){
          556  +            nRead = (int)(pFile->iOff - iStart);
   538    557             }
   539    558             rc = sqlite3OsRead(
   540         -              pTask->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
   541         -              );
          559  +              pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
          560  +          );
   542    561             assert( rc!=SQLITE_IOERR_SHORT_READ );
   543    562           }
   544    563         }
   545    564       }
   546    565     }
   547    566   
   548    567     if( rc==SQLITE_OK ){
   549    568       u64 nByte;                    /* Size of PMA in bytes */
   550         -    pIter->iEof = pTask->iTemp1Off;
          569  +    pIter->iEof = pFile->iOff;
   551    570       rc = vdbePmaReadVarint(pIter, &nByte);
   552    571       pIter->iEof = pIter->iReadOff + nByte;
   553    572       *pnByte += nByte;
   554    573     }
   555    574   
   556    575     if( rc==SQLITE_OK ){
   557    576       rc = vdbePmaReaderNext(pIter);
................................................................................
   558    577     }
   559    578     return rc;
   560    579   }
   561    580   
   562    581   
   563    582   /*
   564    583   ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, 
   565         -** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
          584  +** size nKey2 bytes). Use pKeyInfo for the collation sequences
   566    585   ** used by the comparison. Return the result of the comparison.
   567    586   **
   568         -** Before returning, object (pTask->pUnpacked) is populated with the
          587  +** Before returning, object pUnpacked is populated with the
   569    588   ** unpacked version of key2. Or, if pKey2 is passed a NULL pointer, then it 
   570         -** is assumed that the (pTask->pUnpacked) structure already contains the 
          589  +** is assumed that the pUnpacked structure already contains the 
   571    590   ** unpacked key to use as key2.
   572    591   **
   573         -** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
          592  +** If an OOM error is encountered, (pUnpacked->error_rc) is set
   574    593   ** to SQLITE_NOMEM.
   575    594   */
   576    595   static int vdbeSorterCompare(
   577         -  SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
          596  +  KeyInfo *pKeyInfo,
          597  +  UnpackedRecord *r2,
   578    598     const void *pKey1, int nKey1,   /* Left side of comparison */
   579    599     const void *pKey2, int nKey2    /* Right side of comparison */
   580    600   ){
   581         -  UnpackedRecord *r2 = pTask->pUnpacked;
   582    601     if( pKey2 ){
   583         -    sqlite3VdbeRecordUnpack(pTask->pKeyInfo, nKey2, pKey2, r2);
          602  +    sqlite3VdbeRecordUnpack(pKeyInfo, nKey2, pKey2, r2);
   584    603     }
   585    604     return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
   586    605   }
   587    606   
   588    607   /*
   589    608   ** This function is called to compare two iterator keys when merging 
   590    609   ** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
   591    610   ** value to recalculate.
   592    611   */
   593    612   static int vdbeSorterDoCompare(
   594         -  SortSubtask *pTask, 
          613  +  SortLevel *pLvl, 
   595    614     MergeEngine *pMerger, 
   596    615     int iOut
   597    616   ){
   598    617     int i1;
   599    618     int i2;
   600    619     int iRes;
   601    620     PmaReader *p1;
................................................................................
   616    635   
   617    636     if( p1->pFile==0 ){
   618    637       iRes = i2;
   619    638     }else if( p2->pFile==0 ){
   620    639       iRes = i1;
   621    640     }else{
   622    641       int res;
   623         -    assert( pTask->pUnpacked!=0 );  /* allocated in vdbeSortSubtaskMain() */
   624         -    res = vdbeSorterCompare(
   625         -        pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey
          642  +    assert( pLvl->pUnpacked!=0 );  /* allocated in vdbeSorterThread() */
          643  +    res = vdbeSorterCompare(pLvl->pTask->pKeyInfo, pLvl->pUnpacked,
          644  +        p1->aKey, p1->nKey, p2->aKey, p2->nKey
   626    645       );
   627    646       if( res<=0 ){
   628    647         iRes = i1;
   629    648       }else{
   630    649         iRes = i2;
   631    650       }
   632    651     }
................................................................................
   670    689   
   671    690       pSorter->nTask = nWorker + 1;
   672    691       for(i=0; i<pSorter->nTask; i++){
   673    692         SortSubtask *pTask = &pSorter->aTask[i];
   674    693         pTask->pKeyInfo = pKeyInfo;
   675    694         pTask->pgsz = pgsz;
   676    695         pTask->db = db;
          696  +      pTask->pSorter = pSorter;
   677    697       }
   678    698   
   679    699       if( !sqlite3TempInMemory(db) ){
   680    700         pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
   681    701         mxCache = db->aDb[0].pSchema->cache_size;
   682    702         if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
   683    703         pSorter->mxPmaSize = mxCache * pgsz;
................................................................................
   684    704   
   685    705         /* If the application is using memsys3 or memsys5, use a separate 
   686    706         ** allocation for each sort-key in memory. Otherwise, use a single big
   687    707         ** allocation at pSorter->aMemory for all sort-keys.  */
   688    708         if( sqlite3GlobalConfig.pHeap==0 ){
   689    709           assert( pSorter->iMemory==0 );
   690    710           pSorter->nMemory = pgsz;
   691         -        pSorter->aMemory = (u8*)sqlite3Malloc(pgsz);
   692         -        if( !pSorter->aMemory ) rc = SQLITE_NOMEM;
          711  +        pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
          712  +        if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM;
   693    713         }
   694    714       }
   695    715     }
   696    716   
   697    717     return rc;
   698    718   }
   699    719   
................................................................................
   706    726     for(p=pRecord; p; p=pNext){
   707    727       pNext = p->u.pNext;
   708    728       sqlite3DbFree(db, p);
   709    729     }
   710    730   }
   711    731   
   712    732   /*
   713         -** Free all resources owned by the object indicated by argument pTask. All 
   714         -** fields of *pTask are zeroed before returning.
          733  +** Free all resources owned by the object indicated by argument pTask. 
          734  +** This does not include joining any outstanding threads. All fields of 
          735  +** *pTask are zeroed before returning.
   715    736   */
   716    737   static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
   717         -  sqlite3DbFree(db, pTask->pUnpacked);
   718         -  pTask->pUnpacked = 0;
   719         -  if( pTask->aListMemory==0 ){
   720         -    vdbeSorterRecordFree(0, pTask->pList);
   721         -  }else{
   722         -    sqlite3_free(pTask->aListMemory);
   723         -    pTask->aListMemory = 0;
          738  +  SortLevel *pLvl;
          739  +  SortLevel *pNext;
          740  +  for(pLvl=pTask->pLevel; pLvl; pLvl=pNext){
          741  +    pNext = pLvl->pNext;
          742  +    assert( pLvl->pThread==0 );
          743  +    if( pLvl==pTask->pLevel ){
          744  +      if( pLvl->in.l.aMemory==0 ){
          745  +        vdbeSorterRecordFree(0, pLvl->in.l.pRecord);
          746  +      }else{
          747  +        sqlite3_free(pLvl->in.l.aMemory);
          748  +      }
          749  +    }else{
          750  +      if( pLvl->in.f.pFd ) sqlite3OsCloseFree(pLvl->in.f.pFd);
          751  +    }
          752  +    if( pLvl->out.pFd ) sqlite3OsCloseFree(pLvl->out.pFd);
          753  +    sqlite3DbFree(db, pLvl->pUnpacked);
          754  +    sqlite3_free(pLvl);
   724    755     }
   725         -  pTask->pList = 0;
   726         -  if( pTask->pTemp1 ){
   727         -    sqlite3OsCloseFree(pTask->pTemp1);
   728         -    pTask->pTemp1 = 0;
   729         -  }
          756  +  pTask->pLevel = 0;
          757  +  pTask->nConsolidate = 0;
   730    758   }
   731    759   
   732    760   /*
   733    761   ** Join all threads.  
   734    762   */
   735    763   #if SQLITE_MAX_WORKER_THREADS>0
   736    764   static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
   737    765     int rc = rcin;
   738    766     int i;
   739    767     for(i=0; i<pSorter->nTask; i++){
   740    768       SortSubtask *pTask = &pSorter->aTask[i];
   741         -    if( pTask->pThread ){
   742         -      void *pRet;
   743         -      int rc2 = sqlite3ThreadJoin(pTask->pThread, &pRet);
   744         -      pTask->pThread = 0;
   745         -      pTask->bDone = 0;
   746         -      if( rc==SQLITE_OK ) rc = rc2;
   747         -      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
          769  +    SortLevel *pLvl;
          770  +    for(pLvl=pTask->pLevel; pLvl; pLvl=pLvl->pNext){
          771  +      if( pLvl->pThread ){
          772  +        void *pRet;
          773  +        int rc2 = sqlite3ThreadJoin(pLvl->pThread, &pRet);
          774  +        pLvl->pThread = 0;
          775  +        pLvl->bDone = 0;
          776  +        if( rc==SQLITE_OK ) rc = rc2;
          777  +        if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
          778  +      }
   748    779       }
   749    780     }
   750    781     return rc;
   751    782   }
   752    783   #else
   753    784   # define vdbeSorterJoinAll(x,rcin) (rcin)
   754    785   #endif
................................................................................
   795    826     (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
   796    827     vdbeMergeEngineFree(pSorter->pMerger);
   797    828     pSorter->pMerger = 0;
   798    829     for(i=0; i<pSorter->nTask; i++){
   799    830       SortSubtask *pTask = &pSorter->aTask[i];
   800    831       vdbeSortSubtaskCleanup(db, pTask);
   801    832     }
   802         -  if( pSorter->aMemory==0 ){
   803         -    vdbeSorterRecordFree(0, pSorter->pRecord);
          833  +  if( pSorter->list.aMemory==0 ){
          834  +    vdbeSorterRecordFree(0, pSorter->list.pRecord);
   804    835     }
   805         -  pSorter->pRecord = 0;
   806         -  pSorter->nInMemory = 0;
          836  +  pSorter->list.pRecord = 0;
          837  +  pSorter->list.nInMemory = 0;
   807    838     pSorter->bUsePMA = 0;
   808    839     pSorter->iMemory = 0;
   809    840   }
   810    841   
   811    842   /*
   812    843   ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
   813    844   */
   814    845   void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
   815    846     VdbeSorter *pSorter = pCsr->pSorter;
   816    847     if( pSorter ){
   817    848       sqlite3VdbeSorterReset(db, pSorter);
   818    849       vdbeMergeEngineFree(pSorter->pMerger);
   819         -    sqlite3_free(pSorter->aMemory);
          850  +    sqlite3_free(pSorter->list.aMemory);
          851  +    sqlite3DbFree(db, pSorter->pUnpacked);
   820    852       sqlite3DbFree(db, pSorter);
   821    853       pCsr->pSorter = 0;
   822    854     }
   823    855   }
   824    856   
   825    857   /*
   826    858   ** Allocate space for a file-handle and open a temporary file. If successful,
................................................................................
   842    874   }
   843    875   
   844    876   /*
   845    877   ** Merge the two sorted lists p1 and p2 into a single list.
   846    878   ** Set *ppOut to the head of the new list.
   847    879   */
   848    880   static void vdbeSorterMerge(
   849         -  SortSubtask *pTask,             /* Calling thread context */
          881  +  KeyInfo *pKeyInfo,
          882  +  UnpackedRecord *r2,
   850    883     SorterRecord *p1,               /* First list to merge */
   851    884     SorterRecord *p2,               /* Second list to merge */
   852    885     SorterRecord **ppOut            /* OUT: Head of merged list */
   853    886   ){
   854    887     SorterRecord *pFinal = 0;
   855    888     SorterRecord **pp = &pFinal;
   856    889     void *pVal2 = p2 ? SRVAL(p2) : 0;
   857    890   
   858    891     while( p1 && p2 ){
   859    892       int res;
   860         -    res = vdbeSorterCompare(pTask, SRVAL(p1), p1->nVal, pVal2, p2->nVal);
          893  +    res = vdbeSorterCompare(pKeyInfo, r2, SRVAL(p1), p1->nVal, pVal2, p2->nVal);
   861    894       if( res<=0 ){
   862    895         *pp = p1;
   863    896         pp = &p1->u.pNext;
   864    897         p1 = p1->u.pNext;
   865    898         pVal2 = 0;
   866    899       }else{
   867    900         *pp = p2;
................................................................................
   872    905       }
   873    906     }
   874    907     *pp = p1 ? p1 : p2;
   875    908     *ppOut = pFinal;
   876    909   }
   877    910   
   878    911   /*
   879         -** Sort the linked list of records headed at pTask->pList. Return 
          912  +** Sort the linked list of records headed at pLvl->in.l.pRecord. Return 
   880    913   ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
   881    914   ** an error occurs.
   882    915   */
   883         -static int vdbeSorterSort(SortSubtask *pTask){
          916  +static int vdbeSorterSort(
          917  +  SortList *pList, 
          918  +  KeyInfo *pKeyInfo, 
          919  +  UnpackedRecord *pUnpacked
          920  +){
   884    921     int i;
   885    922     SorterRecord **aSlot;
   886    923     SorterRecord *p;
   887    924   
   888    925     aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
   889    926     if( !aSlot ){
   890    927       return SQLITE_NOMEM;
   891    928     }
   892    929   
   893         -  p = pTask->pList;
          930  +  p = pList->pRecord;
   894    931     while( p ){
   895    932       SorterRecord *pNext;
   896         -    if( pTask->aListMemory ){
   897         -      if( (u8*)p==pTask->aListMemory ){
          933  +    if( pList->aMemory ){
          934  +      if( (u8*)p==pList->aMemory ){
   898    935           pNext = 0;
   899    936         }else{
   900         -        assert( p->u.iNext<sqlite3MallocSize(pTask->aListMemory) );
   901         -        pNext = (SorterRecord*)&pTask->aListMemory[p->u.iNext];
          937  +        assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
          938  +        pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
   902    939         }
   903    940       }else{
   904    941         pNext = p->u.pNext;
   905    942       }
   906    943   
   907    944       p->u.pNext = 0;
   908    945       for(i=0; aSlot[i]; i++){
   909         -      vdbeSorterMerge(pTask, p, aSlot[i], &p);
          946  +      vdbeSorterMerge(pKeyInfo, pUnpacked, p, aSlot[i], &p);
   910    947         aSlot[i] = 0;
   911    948       }
   912    949       aSlot[i] = p;
   913    950       p = pNext;
   914    951     }
   915    952   
   916    953     p = 0;
   917    954     for(i=0; i<64; i++){
   918         -    vdbeSorterMerge(pTask, p, aSlot[i], &p);
          955  +    vdbeSorterMerge(pKeyInfo, pUnpacked, p, aSlot[i], &p);
   919    956     }
   920         -  pTask->pList = p;
          957  +  pList->pRecord = p;
   921    958   
   922    959     sqlite3_free(aSlot);
   923    960     return SQLITE_OK;
   924    961   }
   925    962   
   926    963   /*
   927    964   ** Initialize a PMA-writer object.
................................................................................
  1013   1050   ** is guaranteed to be nByte bytes or smaller in size. This function
  1014   1051   ** attempts to extend the file to nByte bytes in size and to ensure that
  1015   1052   ** the VFS has memory mapped it.
  1016   1053   **
  1017   1054   ** Whether or not the file does end up memory mapped of course depends on
  1018   1055   ** the specific VFS implementation.
  1019   1056   */
  1020         -static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){
  1021         -  if( nByte<=(i64)(db->nMaxSorterMmap) ){
  1022         -    int rc = sqlite3OsTruncate(pFile, nByte);
         1057  +static void vdbeSorterExtendFile(
         1058  +  sqlite3 *db, 
         1059  +  SortFile *pFile,
         1060  +  i64 nByte
         1061  +){
         1062  +  if( nByte<=(i64)(db->nMaxSorterMmap) && nByte>pFile->nByte ){
         1063  +    sqlite3_file *pFd = pFile->pFd;
         1064  +    int rc = sqlite3OsTruncate(pFd, nByte);
  1023   1065       if( rc==SQLITE_OK ){
  1024   1066         void *p = 0;
  1025         -      sqlite3OsFetch(pFile, 0, nByte, &p);
  1026         -      sqlite3OsUnfetch(pFile, 0, p);
         1067  +      sqlite3OsFetch(pFd, 0, nByte, &p);
         1068  +      sqlite3OsUnfetch(pFd, 0, p);
         1069  +      pFile->nByte = nByte;
  1027   1070       }
  1028   1071     }
  1029   1072   }
  1030   1073   #else
  1031   1074   # define vdbeSorterExtendFile(x,y,z) SQLITE_OK
  1032   1075   #endif
  1033   1076   
  1034         -
  1035         -/*
  1036         -** Write the current contents of the in-memory linked-list to a PMA. Return
  1037         -** SQLITE_OK if successful, or an SQLite error code otherwise.
  1038         -**
  1039         -** The format of a PMA is:
  1040         -**
  1041         -**     * A varint. This varint contains the total number of bytes of content
  1042         -**       in the PMA (not including the varint itself).
  1043         -**
  1044         -**     * One or more records packed end-to-end in order of ascending keys. 
  1045         -**       Each record consists of a varint followed by a blob of data (the 
  1046         -**       key). The varint is the number of bytes in the blob of data.
  1047         -*/
  1048         -static int vdbeSorterListToPMA(SortSubtask *pTask){
  1049         -  int rc = SQLITE_OK;             /* Return code */
  1050         -  PmaWriter writer;               /* Object used to write to the file */
  1051         -
  1052         -  memset(&writer, 0, sizeof(PmaWriter));
  1053         -  assert( pTask->nInMemory>0 );
  1054         -
  1055         -  /* If the first temporary PMA file has not been opened, open it now. */
  1056         -  if( pTask->pTemp1==0 ){
  1057         -    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->pTemp1);
  1058         -    assert( rc!=SQLITE_OK || pTask->pTemp1 );
  1059         -    assert( pTask->iTemp1Off==0 );
  1060         -    assert( pTask->nPMA==0 );
  1061         -  }
  1062         -
  1063         -  /* Try to get the file to memory map */
  1064         -  if( rc==SQLITE_OK ){
  1065         -    vdbeSorterExtendFile(pTask->db, 
  1066         -        pTask->pTemp1, pTask->iTemp1Off + pTask->nInMemory + 9
  1067         -    );
  1068         -  }
  1069         -
  1070         -  if( rc==SQLITE_OK ){
  1071         -    SorterRecord *p;
  1072         -    SorterRecord *pNext = 0;
  1073         -
  1074         -    vdbePmaWriterInit(pTask->pTemp1, &writer, pTask->pgsz,
  1075         -                      pTask->iTemp1Off);
  1076         -    pTask->nPMA++;
  1077         -    vdbePmaWriteVarint(&writer, pTask->nInMemory);
  1078         -    for(p=pTask->pList; p; p=pNext){
  1079         -      pNext = p->u.pNext;
  1080         -      vdbePmaWriteVarint(&writer, p->nVal);
  1081         -      vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
  1082         -      if( pTask->aListMemory==0 ) sqlite3_free(p);
  1083         -    }
  1084         -    pTask->pList = p;
  1085         -    rc = vdbePmaWriterFinish(&writer, &pTask->iTemp1Off);
  1086         -  }
  1087         -
  1088         -  assert( pTask->pList==0 || rc!=SQLITE_OK );
  1089         -  return rc;
  1090         -}
  1091         -
  1092   1077   /*
  1093   1078   ** Advance the MergeEngine iterator passed as the second argument to
  1094   1079   ** the next entry. Set *pbEof to true if this means the iterator has 
  1095   1080   ** reached EOF.
  1096   1081   **
  1097   1082   ** Return SQLITE_OK if successful or an error code if an error occurs.
  1098   1083   */
  1099   1084   static int vdbeSorterNext(
  1100         -  SortSubtask *pTask, 
  1101         -  MergeEngine *pMerger, 
         1085  +  SortLevel *pLvl,
         1086  +  MergeEngine *pMerger,
  1102   1087     int *pbEof
  1103   1088   ){
  1104   1089     int rc;
  1105   1090     int iPrev = pMerger->aTree[1];/* Index of iterator to advance */
         1091  +  KeyInfo *pKeyInfo = pLvl->pTask->pKeyInfo;
         1092  +  UnpackedRecord *r2 = pLvl->pUnpacked;
  1106   1093   
  1107   1094     /* Advance the current iterator */
  1108   1095     rc = vdbePmaReaderNext(&pMerger->aIter[iPrev]);
  1109   1096   
  1110   1097     /* Update contents of aTree[] */
  1111   1098     if( rc==SQLITE_OK ){
  1112   1099       int i;                      /* Index of aTree[] to recalculate */
................................................................................
  1124   1111         /* Compare pIter1 and pIter2. Store the result in variable iRes. */
  1125   1112         int iRes;
  1126   1113         if( pIter1->pFile==0 ){
  1127   1114           iRes = +1;
  1128   1115         }else if( pIter2->pFile==0 ){
  1129   1116           iRes = -1;
  1130   1117         }else{
  1131         -        iRes = vdbeSorterCompare(pTask, 
         1118  +        iRes = vdbeSorterCompare(pKeyInfo, r2, 
  1132   1119               pIter1->aKey, pIter1->nKey, pKey2, pIter2->nKey
  1133   1120           );
  1134   1121         }
  1135   1122   
  1136   1123         /* If pIter1 contained the smaller value, set aTree[i] to its index.
  1137   1124         ** Then set pIter2 to the next iterator to compare to pIter1. In this
  1138         -      ** case there is no cache of pIter2 in pTask->pUnpacked, so set
         1125  +      ** case there is no cache of pIter2 in pLvl->pUnpacked, so set
  1139   1126         ** pKey2 to point to the record belonging to pIter2.
  1140   1127         **
  1141   1128         ** Alternatively, if pIter2 contains the smaller of the two values,
  1142   1129         ** set aTree[i] to its index and update pIter1. If vdbeSorterCompare()
  1143         -      ** was actually called above, then pTask->pUnpacked now contains
         1130  +      ** was actually called above, then pLvl->pUnpacked now contains
  1144   1131         ** a value equivalent to pIter2. So set pKey2 to NULL to prevent
  1145   1132         ** vdbeSorterCompare() from decoding pIter2 again.
  1146   1133         **
  1147   1134         ** If the two values were equal, then the value from the oldest
  1148   1135         ** PMA should be considered smaller. The VdbeSorter.aIter[] array
  1149   1136         ** is sorted from oldest to newest, so pIter1 contains older values
  1150   1137         ** than pIter2 iff (pIter1<pIter2).  */
................................................................................
  1160   1147       }
  1161   1148       *pbEof = (pMerger->aIter[pMerger->aTree[1]].pFile==0);
  1162   1149     }
  1163   1150   
  1164   1151     return rc;
  1165   1152   }
  1166   1153   
         1154  +static UnpackedRecord *vdbeSorterAllocUnpackedRecord(KeyInfo *pKeyInfo){
         1155  +  char *pFree;
         1156  +  UnpackedRecord *pRet;
         1157  +  pRet = sqlite3VdbeAllocUnpackedRecord(pKeyInfo, 0, 0, &pFree);
         1158  +  assert( pRet==(UnpackedRecord*)pFree );
         1159  +  if( pRet ){
         1160  +    pRet->nField = pKeyInfo->nField;
         1161  +    pRet->errCode = 0;
         1162  +  }
         1163  +  return pRet;
         1164  +}
         1165  +
         1166  +#if 0
         1167  +static void vdbeSorterWorkDebug(SortLevel *pLvl, const char *zEvent){
         1168  +  i64 t;
         1169  +  SortLevel *p;
         1170  +  SortSubtask *pTask = pLvl->pTask;
         1171  +  int iTask = (pTask - pTask->pSorter->aTask);
         1172  +  int iLvl = 0;
         1173  +  for(p=pTask->pLevel; p!=pLvl; p=p->pNext) iLvl++;
         1174  +  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
         1175  +  fprintf(stderr, "%lld:%d.%d %s\n", t, iTask, iLvl, zEvent);
         1176  +}
         1177  +static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
         1178  +  i64 t;
         1179  +  sqlite3OsCurrentTimeInt64(db->pVfs, &t);
         1180  +  fprintf(stderr, "%lld:X %s\n", t, zEvent);
         1181  +}
         1182  +#else
         1183  +# define vdbeSorterWorkDebug(x,y)
         1184  +# define vdbeSorterRewindDebug(x,y)
         1185  +#endif
         1186  +
  1167   1187   /*
  1168         -** The main routine for sorter-thread operations.
         1188  +** Merge the data currently stored in (pLevel->in), if any, into a new PMA 
         1189  +** stored within (pLevel->out).
  1169   1190   */
  1170         -static void *vdbeSortSubtaskMain(void *pCtx){
  1171         -  int rc = SQLITE_OK;
  1172         -  SortSubtask *pTask = (SortSubtask*)pCtx;
         1191  +static int vdbeSorterWorkLevel(SortLevel *pLvl){
         1192  +  int rc = SQLITE_OK;             /* Return code */
         1193  +  SortSubtask *pTask = pLvl->pTask;
         1194  +  MergeEngine *pMerger = 0;
         1195  +  SortFile *pOut = &pLvl->out;    /* Write new PMA here */
         1196  +  i64 nOut = 0;                   /* Expected size of new PMA */
         1197  +  PmaWriter writer;               /* Used to write new PMA to pOut */
         1198  +  int bEof = 0;
  1173   1199   
  1174         -  assert( pTask->eWork==SORT_SUBTASK_SORT
  1175         -       || pTask->eWork==SORT_SUBTASK_TO_PMA
  1176         -       || pTask->eWork==SORT_SUBTASK_CONS
  1177         -  );
  1178         -  assert( pTask->bDone==0 );
  1179         -
  1180         -  if( pTask->pUnpacked==0 ){
  1181         -    char *pFree;
  1182         -    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
  1183         -        pTask->pKeyInfo, 0, 0, &pFree
  1184         -    );
  1185         -    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
  1186         -    if( pFree==0 ){
         1200  +  vdbeSorterWorkDebug(pLvl, "enter");
         1201  + 
         1202  +  if( pLvl->pUnpacked==0 ){
         1203  +    pLvl->pUnpacked = vdbeSorterAllocUnpackedRecord(pTask->pKeyInfo);
         1204  +    if( pLvl->pUnpacked==0 ){
  1187   1205         rc = SQLITE_NOMEM;
  1188         -      goto thread_out;
  1189         -    }
  1190         -    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
  1191         -    pTask->pUnpacked->errCode = 0;
  1192         -  }
  1193         -
  1194         -  if( pTask->eWork==SORT_SUBTASK_CONS ){
  1195         -    assert( pTask->pList==0 );
  1196         -    while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){
  1197         -      int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT);
  1198         -      sqlite3_file *pTemp2 = 0;     /* Second temp file to use */
  1199         -      MergeEngine *pMerger;         /* Object for reading/merging PMA data */
  1200         -      i64 iReadOff = 0;             /* Offset in pTemp1 to read from */
  1201         -      i64 iWriteOff = 0;            /* Offset in pTemp2 to write to */
  1202         -      int i;
  1203         -      
  1204         -      /* Allocate a merger object to merge PMAs together. */
  1205         -      pMerger = vdbeMergeEngineNew(nIter);
  1206         -      if( pMerger==0 ){
  1207         -        rc = SQLITE_NOMEM;
  1208         -        break;
  1209         -      }
  1210         -
  1211         -      /* Open a second temp file to write merged data to */
  1212         -      rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2);
  1213         -      if( rc==SQLITE_OK ){
  1214         -        vdbeSorterExtendFile(pTask->db, pTemp2, pTask->iTemp1Off);
  1215         -      }else{
  1216         -        vdbeMergeEngineFree(pMerger);
  1217         -        break;
  1218         -      }
  1219         -
  1220         -      /* This loop runs once for each output PMA. Each output PMA is made
  1221         -      ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */
  1222         -      for(i=0; rc==SQLITE_OK && i<pTask->nPMA; i+=SORTER_MAX_MERGE_COUNT){
  1223         -        PmaWriter writer;         /* Object for writing data to pTemp2 */
  1224         -        i64 nOut = 0;             /* Bytes of data in output PMA */
  1225         -        int bEof = 0;
  1226         -        int rc2;
  1227         -
  1228         -        /* Configure the merger object to read and merge data from the next 
  1229         -        ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs,
  1230         -        ** if that is fewer). */
  1231         -        int iIter;
  1232         -        for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
  1233         -          PmaReader *pIter = &pMerger->aIter[iIter];
  1234         -          rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nOut);
  1235         -          iReadOff = pIter->iEof;
  1236         -          if( iReadOff>=pTask->iTemp1Off || rc!=SQLITE_OK ) break;
  1237         -        }
  1238         -        for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
  1239         -          rc = vdbeSorterDoCompare(pTask, pMerger, iIter);
  1240         -        }
  1241         -
  1242         -        vdbePmaWriterInit(pTemp2, &writer, pTask->pgsz, iWriteOff);
  1243         -        vdbePmaWriteVarint(&writer, nOut);
  1244         -        while( rc==SQLITE_OK && bEof==0 ){
  1245         -          PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
  1246         -          assert( pIter->pFile!=0 );        /* pIter is not at EOF */
  1247         -          vdbePmaWriteVarint(&writer, pIter->nKey);
  1248         -          vdbePmaWriteBlob(&writer, pIter->aKey, pIter->nKey);
  1249         -          rc = vdbeSorterNext(pTask, pMerger, &bEof);
  1250         -        }
  1251         -        rc2 = vdbePmaWriterFinish(&writer, &iWriteOff);
  1252         -        if( rc==SQLITE_OK ) rc = rc2;
  1253         -      }
  1254         -
  1255         -      vdbeMergeEngineFree(pMerger);
  1256         -      sqlite3OsCloseFree(pTask->pTemp1);
  1257         -      pTask->pTemp1 = pTemp2;
  1258         -      pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT);
  1259         -      pTask->iTemp1Off = iWriteOff;
         1206  +      goto work_level_out;
         1207  +    }
         1208  +  }
         1209  +
         1210  +  if( pLvl->out.pFd==0 ){
         1211  +    assert( pLvl->out.iOff==0 );
         1212  +    assert( pLvl->out.nByte==0 );
         1213  +    assert( pLvl->out.nPMA==0 );
         1214  +    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pLvl->out.pFd);
         1215  +    if( rc!=SQLITE_OK ) goto work_level_out;
         1216  +  }
         1217  +
         1218  +  if( pLvl==pTask->pLevel ){
         1219  +    if( pLvl->in.l.pRecord==0 ){
         1220  +      bEof = 1;
         1221  +    }else{
         1222  +      rc = vdbeSorterSort(&pLvl->in.l, pTask->pKeyInfo, pLvl->pUnpacked);
         1223  +      nOut = pLvl->in.l.nInMemory;
  1260   1224       }
  1261   1225     }else{
  1262         -    /* Sort the pTask->pList list */
  1263         -    rc = vdbeSorterSort(pTask);
  1264         -
  1265         -    /* If required, write the list out to a PMA. */
  1266         -    if( rc==SQLITE_OK && pTask->eWork==SORT_SUBTASK_TO_PMA ){
  1267         -#ifdef SQLITE_DEBUG
  1268         -      i64 nExpect = pTask->nInMemory
  1269         -        + sqlite3VarintLen(pTask->nInMemory)
  1270         -        + pTask->iTemp1Off;
         1226  +    int nPMA = pLvl->in.f.nPMA;
         1227  +    if( nPMA==0 ){
         1228  +      bEof = 1;
         1229  +    }else{
         1230  +      pMerger = vdbeMergeEngineNew(nPMA);
         1231  +      if( pMerger==0 ){
         1232  +        rc = SQLITE_NOMEM;
         1233  +      }else{
         1234  +        /* Configure the merger object to read and merge data from all 
         1235  +         ** PMAs at pLvl.  */
         1236  +        int iIter;
         1237  +        i64 iReadOff = 0;
         1238  +        for(iIter=0; iIter<nPMA && rc==SQLITE_OK; iIter++){
         1239  +          PmaReader *pIter = &pMerger->aIter[iIter];
         1240  +          rc = vdbePmaReaderInit(pTask, &pLvl->in.f, iReadOff, pIter, &nOut);
         1241  +          iReadOff = pIter->iEof;
         1242  +        }
         1243  +
         1244  +        for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
         1245  +          rc = vdbeSorterDoCompare(pLvl, pMerger, iIter);
         1246  +        }
         1247  +      }
         1248  +    }
         1249  +  }
         1250  +  if( rc!=SQLITE_OK ) goto work_level_out;
         1251  +
         1252  +  /* If mmap is to be used, pre-extend and map the temp file. */
         1253  +  vdbeSorterExtendFile(pTask->db, &pLvl->out, pLvl->out.iOff + nOut + 9);
         1254  +
         1255  +  if( bEof==0 ){
         1256  +    vdbePmaWriterInit(pOut->pFd, &writer, pTask->pgsz, pOut->iOff);
         1257  +    vdbePmaWriteVarint(&writer, nOut);
         1258  +
         1259  +    while( rc==SQLITE_OK && bEof==0 ){
         1260  +      u8 *aKey;                     /* Next key to write to output */
         1261  +      int nKey;                     /* Size of aKey[] in bytes */
         1262  +      if( pMerger==0 ){
         1263  +        aKey = SRVAL(pLvl->in.l.pRecord);
         1264  +        nKey = pLvl->in.l.pRecord->nVal;
         1265  +      }else{
         1266  +        PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
         1267  +        assert( pIter->pFile );  /* pIter is not at EOF */
         1268  +        aKey = pIter->aKey;
         1269  +        nKey = pIter->nKey;
         1270  +      }
         1271  +
         1272  +      vdbePmaWriteVarint(&writer, nKey);
         1273  +      vdbePmaWriteBlob(&writer, aKey, nKey);
         1274  +
         1275  +      if( pMerger==0 ){
         1276  +        SorterRecord *pNext = pLvl->in.l.pRecord->u.pNext;
         1277  +        if( pLvl->in.l.aMemory==0 ) sqlite3_free(pLvl->in.l.pRecord);
         1278  +        pLvl->in.l.pRecord = pNext;
         1279  +        bEof = (pNext==0);
         1280  +      }else{
         1281  +        rc = vdbeSorterNext(pLvl, pMerger, &bEof);
         1282  +      }
         1283  +    }
         1284  +    rc = vdbePmaWriterFinish(&writer, &pOut->iOff);
         1285  +    pOut->nPMA++;
         1286  +
         1287  +    if( rc==SQLITE_OK && pMerger ){
         1288  +      sqlite3OsCloseFree(pLvl->in.f.pFd);
         1289  +      pLvl->in.f.pFd = 0;
         1290  +    }
         1291  +    vdbeMergeEngineFree(pMerger);
         1292  +  }
         1293  +
         1294  +  if( rc==SQLITE_OK && (
         1295  +      (pOut->nPMA>=SORTER_MAX_MERGE_COUNT)
         1296  +   || (pTask->nConsolidate && pLvl->pNext)
         1297  +   || (pTask->nConsolidate && pTask->nConsolidate<pOut->nPMA)
         1298  +  )){
         1299  +    SortLevel *pNext = pLvl->pNext;
         1300  +    if( pNext==0 ){
         1301  +      pNext = (SortLevel*)sqlite3_malloc(sizeof(SortLevel));
         1302  +      if( pNext==0 ){
         1303  +        rc = SQLITE_NOMEM;
         1304  +        goto work_level_out;
         1305  +      }
         1306  +      memset(pNext, 0, sizeof(SortLevel));
         1307  +      pLvl->pNext = pNext;
         1308  +      pNext->pTask = pTask;
         1309  +    }
         1310  +
         1311  +    /* If there is a thread running on the next level, block on it. */
         1312  +#if SQLITE_MAX_WORKER_THREADS>0
         1313  +    if( pNext->pThread ){
         1314  +      void *pRet;
         1315  +      rc = sqlite3ThreadJoin(pNext->pThread, &pRet);
         1316  +      pNext->pThread = 0;
         1317  +      pNext->bDone = 0;
         1318  +      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
         1319  +      if( rc!=SQLITE_OK ) goto work_level_out;
         1320  +    }
  1271   1321   #endif
  1272         -      rc = vdbeSorterListToPMA(pTask);
  1273         -      assert( rc!=SQLITE_OK || (nExpect==pTask->iTemp1Off) );
  1274         -    }
         1322  +
         1323  +    pNext->in.f = pLvl->out;
         1324  +    memset(&pLvl->out, 0, sizeof(pLvl->out));
  1275   1325     }
  1276   1326   
  1277         - thread_out:
  1278         -  pTask->bDone = 1;
  1279         -  if( rc==SQLITE_OK && pTask->pUnpacked->errCode ){
  1280         -    assert( pTask->pUnpacked->errCode==SQLITE_NOMEM );
         1327  + work_level_out:
         1328  +  vdbeSorterWorkDebug(pLvl, "exit");
         1329  +  if( rc==SQLITE_OK && pLvl->pUnpacked->errCode ){
         1330  +    assert( pLvl->pUnpacked->errCode==SQLITE_NOMEM );
  1281   1331       rc = SQLITE_NOMEM;
  1282   1332     }
  1283         -  return SQLITE_INT_TO_PTR(rc);
         1333  +  return rc;
  1284   1334   }
  1285   1335   
  1286   1336   /*
  1287   1337   ** Run the activity scheduled by the object passed as the only argument
  1288   1338   ** in the current thread.
  1289   1339   */
  1290         -static int vdbeSorterRunTask(SortSubtask *pTask){
  1291         -  int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pTask) );
  1292         -  assert( pTask->bDone );
  1293         -  pTask->bDone = 0;
         1340  +static int vdbeSorterRun(SortLevel *pLvl){
         1341  +  int rc;
         1342  +
         1343  +  assert( pLvl->bDone==0 );
         1344  +  assert( pLvl->pThread==0 );
         1345  +  while( 1 ){
         1346  +    rc = vdbeSorterWorkLevel(pLvl);
         1347  +    if( rc==SQLITE_OK && pLvl->pTask->pLevel==pLvl && pLvl->in.l.aMemory ){
         1348  +      assert( pLvl->pTask->pSorter->list.aMemory==0 );
         1349  +      assert( pLvl->in.l.pRecord==0 );
         1350  +      pLvl->pTask->pSorter->list.aMemory = pLvl->in.l.aMemory;
         1351  +      pLvl->in.l.aMemory = 0;
         1352  +    }
         1353  +
         1354  +    if( rc!=SQLITE_OK || pLvl->out.nPMA>0 ) break;
         1355  +    pLvl = pLvl->pNext;
         1356  +    assert( pLvl->bDone==0 );
         1357  +    assert( pLvl->pThread==0 );
         1358  +  }
         1359  +
         1360  +  pLvl->bDone = 0;
  1294   1361     return rc;
  1295   1362   }
         1363  +
         1364  +#if SQLITE_MAX_WORKER_THREADS>0
         1365  +static void *vdbeSorterThread(void *pCtx){
         1366  +  int rc;
         1367  +  SortLevel *pLvl = (SortLevel*)pCtx;
         1368  +
         1369  +  rc = vdbeSorterWorkLevel(pLvl);
         1370  +  if( rc==SQLITE_OK && pLvl->out.nPMA==0 ){
         1371  +    SortLevel *pNext = pLvl->pNext;
         1372  +    void *pCtx = (void*)pNext;
         1373  +    assert( pNext->pThread==0 );
         1374  +    rc = sqlite3ThreadCreate(&pNext->pThread, vdbeSorterThread, pCtx);
         1375  +  }
         1376  +
         1377  +  pLvl->bDone = 1;
         1378  +  return SQLITE_INT_TO_PTR(rc);
         1379  +}
         1380  +#endif
  1296   1381   
  1297   1382   /*
  1298   1383   ** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly
  1299   1384   ** using a background thread.
  1300   1385   **
  1301   1386   ** If argument bFg is non-zero, the operation always uses the calling thread.
  1302   1387   */
  1303   1388   static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
  1304   1389     VdbeSorter *pSorter = pCsr->pSorter;
  1305   1390     int rc = SQLITE_OK;
  1306   1391     int i;
  1307         -  SortSubtask *pTask = 0;    /* Thread context used to create new PMA */
  1308         -  int nWorker = (pSorter->nTask-1);
         1392  +  SortSubtask *pTask = 0;    /* Sub-task new PMA is written to */
         1393  +  SortLevel *pLevel;         /* Level to write to */
  1309   1394   
         1395  +  /* Set the use-temp-files flag. */
  1310   1396     pSorter->bUsePMA = 1;
  1311         -  for(i=0; i<nWorker; i++){
  1312         -    int iTest = (pSorter->iPrev + i + 1) % nWorker;
  1313         -    pTask = &pSorter->aTask[iTest];
  1314         -#if SQLITE_MAX_WORKER_THREADS>0
  1315         -    if( pTask->bDone ){
  1316         -      void *pRet;
  1317         -      assert( pTask->pThread );
  1318         -      rc = sqlite3ThreadJoin(pTask->pThread, &pRet);
  1319         -      pTask->pThread = 0;
  1320         -      pTask->bDone = 0;
  1321         -      if( rc==SQLITE_OK ){
  1322         -        rc = SQLITE_PTR_TO_INT(pRet);
  1323         -      }
         1397  +
         1398  +  /* Select one of the sub-tasks to flush this PMA. In single threaded
         1399  +  ** mode (pSorter->nTask==1), this is always aTask[0]. In multi-threaded mode,
         1400  +  ** it may be any of the pSorter->nTask sub-tasks.  */
         1401  +  for(i=0; i<pSorter->nTask; i++){
         1402  +    pTask = &pSorter->aTask[i];
         1403  +    if( pTask->pLevel==0 
         1404  +     || pTask->pLevel->pThread==0 
         1405  +     || pTask->pLevel->bDone 
         1406  +    ){
         1407  +      break;
         1408  +    }
         1409  +  }
         1410  +
         1411  +  /* If the first level for this task has not been allocated, allocate it. */
         1412  +  if( pTask->pLevel==0 ){
         1413  +    SortLevel *pNew = (SortLevel*)sqlite3_malloc(sizeof(SortLevel));
         1414  +    if( pNew==0 ){
         1415  +      rc = SQLITE_NOMEM;
         1416  +    }else{
         1417  +      memset(pNew, 0, sizeof(SortLevel));
         1418  +      pNew->pTask = pTask;
         1419  +      pTask->pLevel = pNew;
  1324   1420       }
         1421  +  }
         1422  +  pLevel = pTask->pLevel;
         1423  +
         1424  +  /* If there is a background thread using the selected task, wait for
         1425  +  ** it to finish. */
         1426  +#if SQLITE_MAX_WORKER_THREADS>0
         1427  +  if( rc==SQLITE_OK && pLevel->pThread ){
         1428  +    void *pRet = 0;
         1429  +    rc = sqlite3ThreadJoin(pLevel->pThread, &pRet);
         1430  +    pLevel->pThread = 0;
         1431  +    pLevel->bDone = 0;
         1432  +    if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
         1433  +  }
  1325   1434   #endif
  1326         -    if( pTask->pThread==0 ) break;
  1327         -    pTask = 0;
  1328         -  }
  1329         -  if( pTask==0 ){
  1330         -    pTask = &pSorter->aTask[nWorker];
  1331         -  }
  1332         -  pSorter->iPrev = (pTask - pSorter->aTask);
  1333   1435   
  1334   1436     if( rc==SQLITE_OK ){
  1335         -    assert( pTask->pThread==0 && pTask->bDone==0 );
  1336         -    pTask->eWork = SORT_SUBTASK_TO_PMA;
  1337         -    pTask->pList = pSorter->pRecord;
  1338         -    pTask->nInMemory = pSorter->nInMemory;
  1339         -    pSorter->nInMemory = 0;
  1340         -    pSorter->pRecord = 0;
  1341         -
  1342         -    if( pSorter->aMemory ){
  1343         -      u8 *aMem = pTask->aListMemory;
  1344         -      pTask->aListMemory = pSorter->aMemory;
  1345         -      pSorter->aMemory = aMem;
         1437  +    u8 *aNewMem = 0;
         1438  +    if( pSorter->list.aMemory && pSorter->nTask>1 ){
         1439  +      aNewMem = pLevel->in.l.aMemory;
         1440  +      if( aNewMem==0 ){
         1441  +        aNewMem = sqlite3_malloc(pSorter->mxPmaSize);
         1442  +        if( aNewMem==0 ) rc = SQLITE_NOMEM;
         1443  +      }
  1346   1444       }
  1347         -
         1445  +    assert( pLevel->in.l.pRecord==0 );
         1446  +    pLevel->in.l = pSorter->list;
         1447  +    pSorter->list.pRecord = 0;
         1448  +    pSorter->list.nInMemory = 0;
         1449  +    pSorter->list.aMemory = aNewMem;
         1450  +    if( rc==SQLITE_OK ){
  1348   1451   #if SQLITE_MAX_WORKER_THREADS>0
  1349         -    if( !bFg && pTask!=&pSorter->aTask[nWorker] ){
  1350         -      /* Launch a background thread for this operation */
  1351         -      void *pCtx = (void*)pTask;
  1352         -      assert( pSorter->aMemory==0 || pTask->aListMemory!=0 );
  1353         -      if( pTask->aListMemory ){
  1354         -        if( pSorter->aMemory==0 ){
  1355         -          pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
  1356         -          if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
  1357         -        }else{
  1358         -          pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory);
  1359         -        }
  1360         -      }
  1361         -      rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx);
  1362         -    }else
         1452  +      if( pSorter->nTask>1 ){
         1453  +        void *pCtx = (void*)pLevel;
         1454  +        rc = sqlite3ThreadCreate(&pLevel->pThread, vdbeSorterThread, pCtx);
         1455  +        pSorter->nMemory = aNewMem ? sqlite3MallocSize(aNewMem) : 0;
         1456  +      }else
  1363   1457   #endif
  1364         -    {
  1365         -      /* Use the foreground thread for this operation */
  1366         -      rc = vdbeSorterRunTask(pTask);
  1367         -      if( rc==SQLITE_OK ){
  1368         -        u8 *aMem = pTask->aListMemory;
  1369         -        pTask->aListMemory = pSorter->aMemory;
  1370         -        pSorter->aMemory = aMem;
  1371         -        assert( pTask->pList==0 );
         1458  +      {
         1459  +        rc = vdbeSorterRun(pLevel);
  1372   1460         }
  1373   1461       }
  1374   1462     }
  1375   1463   
  1376   1464     return rc;
  1377   1465   }
  1378   1466   
................................................................................
  1409   1497     **
  1410   1498     **   * The total memory allocated for the in-memory list is greater 
  1411   1499     **     than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
  1412   1500     */
  1413   1501     nReq = pVal->n + sizeof(SorterRecord);
  1414   1502     nPMA = pVal->n + sqlite3VarintLen(pVal->n);
  1415   1503     if( pSorter->mxPmaSize ){
  1416         -    if( pSorter->aMemory ){
         1504  +    if( pSorter->list.aMemory ){
  1417   1505         bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
  1418   1506       }else{
         1507  +      int nInMemory = pSorter->list.nInMemory;
  1419   1508         bFlush = (
  1420         -          (pSorter->nInMemory > pSorter->mxPmaSize)
  1421         -       || (pSorter->nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
         1509  +          (nInMemory > pSorter->mxPmaSize)
         1510  +       || (nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
  1422   1511         );
  1423   1512       }
  1424   1513       if( bFlush ){
  1425   1514         rc = vdbeSorterFlushPMA(db, pCsr, 0);
  1426         -      pSorter->nInMemory = 0;
         1515  +      pSorter->list.nInMemory = 0;
  1427   1516         pSorter->iMemory = 0;
  1428         -      assert( rc!=SQLITE_OK || pSorter->pRecord==0 );
         1517  +      assert( rc!=SQLITE_OK || pSorter->list.pRecord==0 );
  1429   1518       }
  1430   1519     }
  1431   1520   
  1432         -  pSorter->nInMemory += nPMA;
         1521  +  pSorter->list.nInMemory += nPMA;
  1433   1522   
  1434         -  if( pSorter->aMemory ){
         1523  +  if( pSorter->list.aMemory ){
  1435   1524       int nMin = pSorter->iMemory + nReq;
  1436   1525   
  1437   1526       if( nMin>pSorter->nMemory ){
  1438   1527         u8 *aNew;
  1439   1528         int nNew = pSorter->nMemory * 2;
  1440   1529         while( nNew < nMin ) nNew = nNew*2;
  1441   1530         if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
  1442   1531         if( nNew < nMin ) nNew = nMin;
  1443   1532   
  1444         -      aNew = sqlite3Realloc(pSorter->aMemory, nNew);
         1533  +      aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
  1445   1534         if( !aNew ) return SQLITE_NOMEM;
  1446         -      pSorter->pRecord = (SorterRecord*)(
  1447         -          aNew + ((u8*)pSorter->pRecord - pSorter->aMemory)
         1535  +      pSorter->list.pRecord = (SorterRecord*)(
         1536  +          aNew + ((u8*)pSorter->list.pRecord - pSorter->list.aMemory)
  1448   1537         );
  1449         -      pSorter->aMemory = aNew;
         1538  +      pSorter->list.aMemory = aNew;
  1450   1539         pSorter->nMemory = nNew;
  1451   1540       }
  1452   1541   
  1453         -    pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory];
         1542  +    pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
  1454   1543       pSorter->iMemory += ROUND8(nReq);
  1455         -    pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory;
         1544  +    pNew->u.iNext = (u8*)(pSorter->list.pRecord) - pSorter->list.aMemory;
  1456   1545     }else{
  1457   1546       pNew = (SorterRecord *)sqlite3Malloc(nReq);
  1458   1547       if( pNew==0 ){
  1459   1548         return SQLITE_NOMEM;
  1460   1549       }
  1461         -    pNew->u.pNext = pSorter->pRecord;
         1550  +    pNew->u.pNext = pSorter->list.pRecord;
  1462   1551     }
  1463   1552   
  1464   1553     memcpy(SRVAL(pNew), pVal->z, pVal->n);
  1465   1554     pNew->nVal = pVal->n;
  1466         -  pSorter->pRecord = pNew;
         1555  +  pSorter->list.pRecord = pNew;
  1467   1556   
  1468   1557     return rc;
  1469   1558   }
  1470   1559   
  1471         -/*
  1472         -** Return the total number of PMAs in all temporary files.
  1473         -*/
  1474         -static int vdbeSorterCountPMA(VdbeSorter *pSorter){
  1475         -  int nPMA = 0;
  1476         -  int i;
  1477         -  for(i=0; i<pSorter->nTask; i++){
  1478         -    nPMA += pSorter->aTask[i].nPMA;
  1479         -  }
  1480         -  return nPMA;
  1481         -}
  1482   1560   
  1483   1561   /*
  1484   1562   ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
  1485   1563   ** this function is called to prepare for iterating through the records
  1486   1564   ** in sorted order.
  1487   1565   */
  1488   1566   int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
  1489   1567     VdbeSorter *pSorter = pCsr->pSorter;
  1490   1568     int rc = SQLITE_OK;             /* Return code */
         1569  +  int nTask = 0;
         1570  +  int i;
  1491   1571   
  1492   1572     assert( pSorter );
  1493   1573   
  1494   1574     /* If no data has been written to disk, then do not do so now. Instead,
  1495         -  ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
  1496         -  ** from the in-memory list.  */
         1575  +  ** sort the VdbeSorter.list.pRecord list. The vdbe layer will read data 
         1576  +  ** directly from the in-memory list.  */
         1577  +  *pbEof = 0;
  1497   1578     if( pSorter->bUsePMA==0 ){
  1498         -    if( pSorter->pRecord ){
         1579  +    if( pSorter->list.pRecord ){
  1499   1580         SortSubtask *pTask = &pSorter->aTask[0];
  1500         -      *pbEof = 0;
  1501         -      pTask->pList = pSorter->pRecord;
  1502         -      pTask->eWork = SORT_SUBTASK_SORT;
  1503         -      assert( pTask->aListMemory==0 );
  1504         -      pTask->aListMemory = pSorter->aMemory;
  1505         -      rc = vdbeSorterRunTask(pTask);
  1506         -      pTask->aListMemory = 0;
  1507         -      pSorter->pRecord = pTask->pList;
  1508         -      pTask->pList = 0;
         1581  +      UnpackedRecord *pUnpack = vdbeSorterAllocUnpackedRecord(pTask->pKeyInfo);
         1582  +      if( pUnpack==0 ) return SQLITE_NOMEM;
         1583  +      rc = vdbeSorterSort(&pSorter->list, pTask->pKeyInfo, pUnpack);
         1584  +      sqlite3DbFree(db, pUnpack);
  1509   1585       }else{
  1510   1586         *pbEof = 1;
  1511   1587       }
  1512   1588       return rc;
  1513   1589     }
  1514   1590   
  1515   1591     /* Write the current in-memory list to a PMA. */
  1516         -  if( pSorter->pRecord ){
  1517         -    rc = vdbeSorterFlushPMA(db, pCsr, 1);
         1592  +  if( pSorter->list.pRecord ){
         1593  +    rc = vdbeSorterFlushPMA(db, pCsr, 0);
  1518   1594     }
  1519   1595   
  1520   1596     /* Join all threads */
  1521   1597     rc = vdbeSorterJoinAll(pSorter, rc);
  1522   1598   
  1523         -  /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
  1524         -  ** some of them together so that this is no longer the case. */
  1525         -  if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
  1526         -    int i;
  1527         -    for(i=0; rc==SQLITE_OK && i<pSorter->nTask; i++){
  1528         -      SortSubtask *pTask = &pSorter->aTask[i];
  1529         -      if( pTask->pTemp1 ){
  1530         -        pTask->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nTask;
  1531         -        pTask->eWork = SORT_SUBTASK_CONS;
         1599  +  vdbeSorterRewindDebug(db, "rewind");
  1532   1600   
         1601  +  for(i=0; i<pSorter->nTask; i++){
         1602  +    if( pSorter->aTask[i].pLevel ) nTask++;
         1603  +  }
         1604  +
         1605  +  for(i=0; rc==SQLITE_OK && i<pSorter->nTask; i++){
         1606  +    SortSubtask *pTask = &pSorter->aTask[i];
         1607  +    if( pTask->pLevel ){
         1608  +      SortLevel *pLvl = pTask->pLevel;
         1609  +      pTask->nConsolidate = (SORTER_MAX_MERGE_COUNT / nTask);
  1533   1610   #if SQLITE_MAX_WORKER_THREADS>0
  1534         -        if( i<(pSorter->nTask-1) ){
  1535         -          void *pCtx = (void*)pTask;
  1536         -          rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx);
  1537         -        }else
         1611  +      if( i<(pSorter->nTask-1) ){
         1612  +        void *pCtx = (void*)pLvl;
         1613  +        rc = sqlite3ThreadCreate(&pLvl->pThread, vdbeSorterThread, pCtx);
         1614  +      }else
  1538   1615   #endif
  1539         -        {
  1540         -          rc = vdbeSorterRunTask(pTask);
  1541         -        }
         1616  +      {
         1617  +        assert( pLvl->pThread==0 );
         1618  +        rc = vdbeSorterRun(pLvl);
  1542   1619         }
  1543   1620       }
  1544   1621     }
  1545   1622   
  1546   1623     /* Join all threads */
  1547   1624     rc = vdbeSorterJoinAll(pSorter, rc);
  1548   1625   
................................................................................
  1550   1627     ** and merge all remaining PMAs.  */
  1551   1628     assert( pSorter->pMerger==0 );
  1552   1629     if( rc==SQLITE_OK ){
  1553   1630       int nIter = 0;                /* Number of iterators used */
  1554   1631       int i;
  1555   1632       MergeEngine *pMerger;
  1556   1633       for(i=0; i<pSorter->nTask; i++){
  1557         -      nIter += pSorter->aTask[i].nPMA;
         1634  +      SortSubtask *pTask = &pSorter->aTask[i];
         1635  +      if( pTask->pLevel ){
         1636  +        SortLevel *pLvl;
         1637  +        for(pLvl=pTask->pLevel; pLvl->pNext; pLvl=pLvl->pNext){
         1638  +          assert( pLvl->out.nPMA==0 );
         1639  +        }
         1640  +        nIter += pLvl->out.nPMA;
         1641  +      }
  1558   1642       }
  1559   1643   
  1560   1644       pSorter->pMerger = pMerger = vdbeMergeEngineNew(nIter);
  1561   1645       if( pMerger==0 ){
  1562   1646         rc = SQLITE_NOMEM;
  1563   1647       }else{
  1564   1648         int iIter = 0;
  1565         -      int iThread = 0;
  1566         -      for(iThread=0; iThread<pSorter->nTask; iThread++){
  1567         -        int iPMA;
  1568         -        i64 iReadOff = 0;
  1569         -        SortSubtask *pTask = &pSorter->aTask[iThread];
  1570         -        for(iPMA=0; iPMA<pTask->nPMA && rc==SQLITE_OK; iPMA++){
  1571         -          i64 nDummy = 0;
  1572         -          PmaReader *pIter = &pMerger->aIter[iIter++];
  1573         -          rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nDummy);
  1574         -          iReadOff = pIter->iEof;
         1649  +      for(i=0; i<pSorter->nTask; i++){
         1650  +        SortSubtask *pTask = &pSorter->aTask[i];
         1651  +        if( pTask->pLevel ){
         1652  +          int iPMA;
         1653  +          i64 iReadOff = 0;
         1654  +          SortLevel *pLvl;
         1655  +          for(pLvl=pTask->pLevel; pLvl->pNext; pLvl=pLvl->pNext);
         1656  +
         1657  +          for(iPMA=0; iPMA<pLvl->out.nPMA && rc==SQLITE_OK; iPMA++){
         1658  +            i64 nDummy = 0;
         1659  +            PmaReader *pIter = &pMerger->aIter[iIter++];
         1660  +            rc = vdbePmaReaderInit(pTask, &pLvl->out, iReadOff, pIter, &nDummy);
         1661  +            iReadOff = pIter->iEof;
         1662  +          }
  1575   1663           }
  1576   1664         }
  1577   1665   
  1578   1666         for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
  1579         -        rc = vdbeSorterDoCompare(&pSorter->aTask[0], pMerger, i);
         1667  +        rc = vdbeSorterDoCompare(pSorter->aTask[0].pLevel, pMerger, i);
  1580   1668         }
  1581   1669       }
  1582   1670     }
  1583   1671   
  1584   1672     if( rc==SQLITE_OK ){
  1585   1673       *pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0);
  1586   1674     }
         1675  +  vdbeSorterRewindDebug(db, "rewinddone");
  1587   1676     return rc;
  1588   1677   }
  1589   1678   
  1590   1679   /*
  1591   1680   ** Advance to the next element in the sorter.
  1592   1681   */
  1593   1682   int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
  1594   1683     VdbeSorter *pSorter = pCsr->pSorter;
  1595   1684     int rc;                         /* Return code */
  1596   1685   
  1597   1686     if( pSorter->pMerger ){
  1598         -    rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof);
         1687  +    rc = vdbeSorterNext(pSorter->aTask[0].pLevel, pSorter->pMerger, pbEof);
  1599   1688     }else{
  1600         -    SorterRecord *pFree = pSorter->pRecord;
  1601         -    pSorter->pRecord = pFree->u.pNext;
         1689  +    SorterRecord *pFree = pSorter->list.pRecord;
         1690  +    pSorter->list.pRecord = pFree->u.pNext;
  1602   1691       pFree->u.pNext = 0;
  1603         -    if( pSorter->aMemory==0 ) vdbeSorterRecordFree(db, pFree);
  1604         -    *pbEof = !pSorter->pRecord;
         1692  +    if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
         1693  +    *pbEof = !pSorter->list.pRecord;
  1605   1694       rc = SQLITE_OK;
  1606   1695     }
  1607   1696     return rc;
  1608   1697   }
  1609   1698   
  1610   1699   /*
  1611   1700   ** Return a pointer to a buffer owned by the sorter that contains the 
................................................................................
  1618   1707     void *pKey;
  1619   1708     if( pSorter->pMerger ){
  1620   1709       PmaReader *pIter;
  1621   1710       pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ];
  1622   1711       *pnKey = pIter->nKey;
  1623   1712       pKey = pIter->aKey;
  1624   1713     }else{
  1625         -    *pnKey = pSorter->pRecord->nVal;
  1626         -    pKey = SRVAL(pSorter->pRecord);
         1714  +    *pnKey = pSorter->list.pRecord->nVal;
         1715  +    pKey = SRVAL(pSorter->list.pRecord);
  1627   1716     }
  1628   1717     return pKey;
  1629   1718   }
  1630   1719   
  1631   1720   /*
  1632   1721   ** Copy the current sorter key into the memory cell pOut.
  1633   1722   */
................................................................................
  1665   1754   int sqlite3VdbeSorterCompare(
  1666   1755     const VdbeCursor *pCsr,         /* Sorter cursor */
  1667   1756     Mem *pVal,                      /* Value to compare to current sorter key */
  1668   1757     int nIgnore,                    /* Ignore this many fields at the end */
  1669   1758     int *pRes                       /* OUT: Result of comparison */
  1670   1759   ){
  1671   1760     VdbeSorter *pSorter = pCsr->pSorter;
  1672         -  UnpackedRecord *r2 = pSorter->aTask[0].pUnpacked;
         1761  +  UnpackedRecord *r2 = pSorter->pUnpacked;
  1673   1762     KeyInfo *pKeyInfo = pCsr->pKeyInfo;
  1674   1763     int i;
  1675   1764     void *pKey; int nKey;           /* Sorter key to compare pVal with */
  1676   1765   
  1677         -  assert( r2->nField>=pKeyInfo->nField-nIgnore );
  1678         -  r2->nField = pKeyInfo->nField-nIgnore;
         1766  +  if( r2==0 ){
         1767  +    r2 = vdbeSorterAllocUnpackedRecord(pSorter->aTask[0].pKeyInfo);
         1768  +    if( r2==0 ) return SQLITE_NOMEM;
         1769  +    pSorter->pUnpacked = r2;
         1770  +    assert( r2->nField>=pKeyInfo->nField-nIgnore );
         1771  +    r2->nField = pKeyInfo->nField-nIgnore;
         1772  +  }
         1773  +  assert( r2->nField==pKeyInfo->nField-nIgnore );
  1679   1774   
  1680   1775     pKey = vdbeSorterRowkey(pSorter, &nKey);
  1681   1776     sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
  1682   1777     for(i=0; i<r2->nField; i++){
  1683   1778       if( r2->aMem[i].flags & MEM_Null ){
  1684   1779         *pRes = -1;
  1685   1780         return SQLITE_OK;
  1686   1781       }
  1687   1782     }
  1688   1783   
  1689   1784     *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2, 0);
  1690   1785     return SQLITE_OK;
  1691   1786   }