SQLite

Check-in [4ee2d910fb]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Change the name of the SorterThread object to "SortSubtask" to avoid confusion with the SQLiteThread object.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | threads
Files: files | file ages | folders
SHA1: 4ee2d910fbbed8d4def15e4e99ee225839f3a739
User & Date: drh 2014-04-02 14:38:14.734
Context
2014-04-02
15:15
Fix some problems with OOM handling in vdbesort.c. (check-in: 47e702bd83 user: dan tags: threads)
14:38
Change the name of the SorterThread object to "SortSubtask" to avoid confusion with the SQLiteThread object. (check-in: 4ee2d910fb user: drh tags: threads)
2014-04-01
18:41
When sorting data for a CREATE INDEX statement in single-threaded mode, assume that keys are delivered to the sorter in primary key order. Also fix various comments that had fallen out of date. (check-in: 821d1ac450 user: dan tags: threads)
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/vdbesort.c.
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
*/

#include "sqliteInt.h"
#include "vdbeInt.h"


typedef struct VdbeSorterIter VdbeSorterIter;
typedef struct SorterThread SorterThread;
typedef struct SorterRecord SorterRecord;
typedef struct SorterMerger SorterMerger;
typedef struct FileWriter FileWriter;


/*
** Candidate values for SorterThread.eWork
*/
#define SORTER_THREAD_SORT   1
#define SORTER_THREAD_TO_PMA 2
#define SORTER_THREAD_CONS   3

/*
** Much of the work performed in this module to sort the list of records is 
** broken down into smaller units that may be peformed in parallel. In order
** to perform such a unit of work, an instance of the following structure
** is configured and passed to vdbeSorterThreadMain() - either directly by 
** the main thread or via a background thread.
**
** Exactly SorterThread.nThread instances of this structure are allocated
** as part of each VdbeSorter object. Instances are never allocated any other
** way. SorterThread.nThread is set to the number of worker threads allowed
** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
**
** When a background thread is launched to perform work, SorterThread.bDone
** is set to 0 and the SorterThread.pThread variable set to point to the
** thread handle. SorterThread.bDone is set to 1 (to indicate to the main
** thread that joining SorterThread.pThread will not block) before the thread
** exits. SorterThread.pThread and bDone are always cleared after the 
** background thread has been joined.
**
** One object (specifically, VdbeSorter.aThread[SorterThread.nThread-1])
** is reserved for the foreground thread.
**
** The nature of the work performed is determined by SorterThread.eWork,
** as follows:
**
**   SORTER_THREAD_SORT:
**     Sort the linked list of records at SorterThread.pList.
**
**   SORTER_THREAD_TO_PMA:
**     Sort the linked list of records at SorterThread.pList, and write
**     the results to a new PMA in temp file SorterThread.pTemp1. Open
**     the temp file if it is not already open.
**
**   SORTER_THREAD_CONS:
**     Merge existing PMAs until SorterThread.nConsolidate or fewer
**     remain in temp file SorterThread.pTemp1.
*/
struct SorterThread {
  SQLiteThread *pThread;          /* Thread handle, or NULL */
  int bDone;                      /* Set to true by pThread when finished */

  sqlite3_vfs *pVfs;              /* VFS used to open temporary files */
  KeyInfo *pKeyInfo;              /* How to compare records */
  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
  int pgsz;                       /* Main database page size */







|






|

|
|
|





|


|

|


|
|
|
|
|


|


|



|


|
|



|
|

|







16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
*/

#include "sqliteInt.h"
#include "vdbeInt.h"


typedef struct VdbeSorterIter VdbeSorterIter;
typedef struct SortSubtask SortSubtask;
typedef struct SorterRecord SorterRecord;
typedef struct SorterMerger SorterMerger;
typedef struct FileWriter FileWriter;


/*
** Candidate values for SortSubtask.eWork
*/
#define SORTER_THREAD_SORT   1  /* Sort records on pList */
#define SORTER_THREAD_TO_PMA 2  /* Xfer pList to Packed-Memory-Array pFile */
#define SORTER_THREAD_CONS   3  /* Consolidate multiple PMAs */

/*
** Much of the work performed in this module to sort the list of records is 
** broken down into smaller units that may be peformed in parallel. In order
** to perform such a unit of work, an instance of the following structure
** is configured and passed to vdbeSortSubtaskMain() - either directly by 
** the main thread or via a background thread.
**
** Exactly SortSubtask.nThread instances of this structure are allocated
** as part of each VdbeSorter object. Instances are never allocated any other
** way. SortSubtask.nThread is set to the number of worker threads allowed
** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
**
** When a background thread is launched to perform work, SortSubtask.bDone
** is set to 0 and the SortSubtask.pThread variable set to point to the
** thread handle. SortSubtask.bDone is set to 1 (to indicate to the main
** thread that joining SortSubtask.pThread will not block) before the thread
** exits. SortSubtask.pThread and bDone are always cleared after the 
** background thread has been joined.
**
** One object (specifically, VdbeSorter.aThread[SortSubtask.nThread-1])
** is reserved for the foreground thread.
**
** The nature of the work performed is determined by SortSubtask.eWork,
** as follows:
**
**   SORTER_THREAD_SORT:
**     Sort the linked list of records at SortSubtask.pList.
**
**   SORTER_THREAD_TO_PMA:
**     Sort the linked list of records at SortSubtask.pList, and write
**     the results to a new PMA in temp file SortSubtask.pTemp1. Open
**     the temp file if it is not already open.
**
**   SORTER_THREAD_CONS:
**     Merge existing PMAs until SortSubtask.nConsolidate or fewer
**     remain in temp file SortSubtask.pTemp1.
*/
struct SortSubtask {
  SQLiteThread *pThread;          /* Thread handle, or NULL */
  int bDone;                      /* Set to true by pThread when finished */

  sqlite3_vfs *pVfs;              /* VFS used to open temporary files */
  KeyInfo *pKeyInfo;              /* How to compare records */
  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
  int pgsz;                       /* Main database page size */
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
  SorterRecord *pRecord;          /* Head of in-memory record list */
  SorterMerger *pMerger;          /* For final merge of PMAs (by caller) */ 
  u8 *aMemory;                    /* Block of memory to alloc records from */
  int iMemory;                    /* Offset of first free byte in aMemory */
  int nMemory;                    /* Size of aMemory allocation in bytes */
  int iPrev;                      /* Previous thread used to flush PMA */
  int nThread;                    /* Size of aThread[] array */
  SorterThread aThread[1];
};

/*
** The following type is an iterator for a PMA. It caches the current key in 
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
*/
struct VdbeSorterIter {







|







178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
  SorterRecord *pRecord;          /* Head of in-memory record list */
  SorterMerger *pMerger;          /* For final merge of PMAs (by caller) */ 
  u8 *aMemory;                    /* Block of memory to alloc records from */
  int iMemory;                    /* Offset of first free byte in aMemory */
  int nMemory;                    /* Size of aMemory allocation in bytes */
  int iPrev;                      /* Previous thread used to flush PMA */
  int nThread;                    /* Size of aThread[] array */
  SortSubtask aThread[1];
};

/*
** The following type is an iterator for a PMA. It caches the current key in 
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
*/
struct VdbeSorterIter {
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
/*
** Initialize iterator pIter to scan through the PMA stored in file pFile
** starting at offset iStart and ending at offset iEof-1. This function 
** leaves the iterator pointing to the first key in the PMA (or EOF if the 
** PMA is empty).
*/
static int vdbeSorterIterInit(
  SorterThread *pThread,          /* Thread context */
  i64 iStart,                     /* Start offset in pThread->pTemp1 */
  VdbeSorterIter *pIter,          /* Iterator to populate */
  i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
){
  int rc = SQLITE_OK;
  int nBuf = pThread->pgsz;
  void *pMap = 0;                 /* Mapping of temp file */







|







423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
/*
** Initialize iterator pIter to scan through the PMA stored in file pFile
** starting at offset iStart and ending at offset iEof-1. This function 
** leaves the iterator pointing to the first key in the PMA (or EOF if the 
** PMA is empty).
*/
static int vdbeSorterIterInit(
  SortSubtask *pThread,           /* Thread context */
  i64 iStart,                     /* Start offset in pThread->pTemp1 */
  VdbeSorterIter *pIter,          /* Iterator to populate */
  i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
){
  int rc = SQLITE_OK;
  int nBuf = pThread->pgsz;
  void *pMap = 0;                 /* Mapping of temp file */
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
** is assumed that the (pThread->pUnpacked) structure already contains the 
** unpacked key to use as key2.
**
** If an OOM error is encountered, (pThread->pUnpacked->error_rc) is set
** to SQLITE_NOMEM.
*/
static int vdbeSorterCompare(
  SorterThread *pThread,          /* Thread context (for pKeyInfo) */
  const void *pKey1, int nKey1,   /* Left side of comparison */
  const void *pKey2, int nKey2    /* Right side of comparison */
){
  UnpackedRecord *r2 = pThread->pUnpacked;
  if( pKey2 ){
    sqlite3VdbeRecordUnpack(pThread->pKeyInfo, nKey2, pKey2, r2);
  }
  return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
}

/*
** This function is called to compare two iterator keys when merging 
** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
** value to recalculate.
*/
static int vdbeSorterDoCompare(
  SorterThread *pThread, 
  SorterMerger *pMerger, 
  int iOut
){
  int i1;
  int i2;
  int iRes;
  VdbeSorterIter *p1;







|
















|







497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
** is assumed that the (pThread->pUnpacked) structure already contains the 
** unpacked key to use as key2.
**
** If an OOM error is encountered, (pThread->pUnpacked->error_rc) is set
** to SQLITE_NOMEM.
*/
static int vdbeSorterCompare(
  SortSubtask *pThread,           /* Thread context (for pKeyInfo) */
  const void *pKey1, int nKey1,   /* Left side of comparison */
  const void *pKey2, int nKey2    /* Right side of comparison */
){
  UnpackedRecord *r2 = pThread->pUnpacked;
  if( pKey2 ){
    sqlite3VdbeRecordUnpack(pThread->pKeyInfo, nKey2, pKey2, r2);
  }
  return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
}

/*
** This function is called to compare two iterator keys when merging 
** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
** value to recalculate.
*/
static int vdbeSorterDoCompare(
  SortSubtask *pThread, 
  SorterMerger *pMerger, 
  int iOut
){
  int i1;
  int i2;
  int iRes;
  VdbeSorterIter *p1;
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557

  if( p1->pFile==0 ){
    iRes = i2;
  }else if( p2->pFile==0 ){
    iRes = i1;
  }else{
    int res;
    assert( pThread->pUnpacked!=0 );  /* allocated in vdbeSorterThreadMain() */
    res = vdbeSorterCompare(
        pThread, p1->aKey, p1->nKey, p2->aKey, p2->nKey
    );
    if( res<=0 ){
      iRes = i1;
    }else{
      iRes = i2;







|







543
544
545
546
547
548
549
550
551
552
553
554
555
556
557

  if( p1->pFile==0 ){
    iRes = i2;
  }else if( p2->pFile==0 ){
    iRes = i1;
  }else{
    int res;
    assert( pThread->pUnpacked!=0 );  /* allocated in vdbeSortSubtaskMain() */
    res = vdbeSorterCompare(
        pThread, p1->aKey, p1->nKey, p2->aKey, p2->nKey
    );
    if( res<=0 ){
      iRes = i1;
    }else{
      iRes = i2;
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
  int szKeyInfo;                  /* Size of pCsr->pKeyInfo in bytes */
  int sz;                         /* Size of pSorter in bytes */
  int rc = SQLITE_OK;
  int nWorker = (sqlite3GlobalConfig.bCoreMutex?sqlite3GlobalConfig.nWorker:0);

  assert( pCsr->pKeyInfo && pCsr->pBt==0 );
  szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
  sz = sizeof(VdbeSorter) + nWorker * sizeof(SorterThread);

  pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
  pCsr->pSorter = pSorter;
  if( pSorter==0 ){
    rc = SQLITE_NOMEM;
  }else{
    pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
    memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
    pKeyInfo->db = 0;
    if( nField && nWorker==0 ) pKeyInfo->nField = nField;
    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);

    pSorter->nThread = nWorker + 1;
    for(i=0; i<pSorter->nThread; i++){
      SorterThread *pThread = &pSorter->aThread[i];
      pThread->pKeyInfo = pKeyInfo;
      pThread->pVfs = db->pVfs;
      pThread->pgsz = pgsz;
    }

    if( !sqlite3TempInMemory(db) ){
      pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;







|














|







574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
  int szKeyInfo;                  /* Size of pCsr->pKeyInfo in bytes */
  int sz;                         /* Size of pSorter in bytes */
  int rc = SQLITE_OK;
  int nWorker = (sqlite3GlobalConfig.bCoreMutex?sqlite3GlobalConfig.nWorker:0);

  assert( pCsr->pKeyInfo && pCsr->pBt==0 );
  szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
  sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);

  pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
  pCsr->pSorter = pSorter;
  if( pSorter==0 ){
    rc = SQLITE_NOMEM;
  }else{
    pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
    memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
    pKeyInfo->db = 0;
    if( nField && nWorker==0 ) pKeyInfo->nField = nField;
    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);

    pSorter->nThread = nWorker + 1;
    for(i=0; i<pSorter->nThread; i++){
      SortSubtask *pThread = &pSorter->aThread[i];
      pThread->pKeyInfo = pKeyInfo;
      pThread->pVfs = db->pVfs;
      pThread->pgsz = pgsz;
    }

    if( !sqlite3TempInMemory(db) ){
      pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
  }
}

/*
** Free all resources owned by the object indicated by argument pThread. All 
** fields of *pThread are zeroed before returning.
*/
static void vdbeSorterThreadCleanup(sqlite3 *db, SorterThread *pThread){
  sqlite3DbFree(db, pThread->pUnpacked);
  pThread->pUnpacked = 0;
  if( pThread->aListMemory==0 ){
    vdbeSorterRecordFree(0, pThread->pList);
  }else{
    sqlite3_free(pThread->aListMemory);
    pThread->aListMemory = 0;







|







632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
  }
}

/*
** Free all resources owned by the object indicated by argument pThread. All 
** fields of *pThread are zeroed before returning.
*/
static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pThread){
  sqlite3DbFree(db, pThread->pUnpacked);
  pThread->pUnpacked = 0;
  if( pThread->aListMemory==0 ){
    vdbeSorterRecordFree(0, pThread->pList);
  }else{
    sqlite3_free(pThread->aListMemory);
    pThread->aListMemory = 0;
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
** Join all threads.  
*/
#if SQLITE_MAX_WORKER_THREADS>0
static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
  int rc = rcin;
  int i;
  for(i=0; i<pSorter->nThread; i++){
    SorterThread *pThread = &pSorter->aThread[i];
    if( pThread->pThread ){
      void *pRet;
      int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet);
      pThread->pThread = 0;
      pThread->bDone = 0;
      if( rc==SQLITE_OK ) rc = rc2;
      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);







|







656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
** Join all threads.  
*/
#if SQLITE_MAX_WORKER_THREADS>0
static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
  int rc = rcin;
  int i;
  for(i=0; i<pSorter->nThread; i++){
    SortSubtask *pThread = &pSorter->aThread[i];
    if( pThread->pThread ){
      void *pRet;
      int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet);
      pThread->pThread = 0;
      pThread->bDone = 0;
      if( rc==SQLITE_OK ) rc = rc2;
      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
/*
** Reset a sorting cursor back to its original empty state.
*/
void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
  int i;
  vdbeSorterJoinAll(pSorter, SQLITE_OK);
  for(i=0; i<pSorter->nThread; i++){
    SorterThread *pThread = &pSorter->aThread[i];
    vdbeSorterThreadCleanup(db, pThread);
  }
  if( pSorter->aMemory==0 ){
    vdbeSorterRecordFree(0, pSorter->pRecord);
  }
  vdbeSorterMergerReset(pSorter->pMerger);
  pSorter->pRecord = 0;
  pSorter->nInMemory = 0;







|
|







721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
/*
** Reset a sorting cursor back to its original empty state.
*/
void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
  int i;
  vdbeSorterJoinAll(pSorter, SQLITE_OK);
  for(i=0; i<pSorter->nThread; i++){
    SortSubtask *pThread = &pSorter->aThread[i];
    vdbeSortSubtaskCleanup(db, pThread);
  }
  if( pSorter->aMemory==0 ){
    vdbeSorterRecordFree(0, pSorter->pRecord);
  }
  vdbeSorterMergerReset(pSorter->pMerger);
  pSorter->pRecord = 0;
  pSorter->nInMemory = 0;
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
}

/*
** Merge the two sorted lists p1 and p2 into a single list.
** Set *ppOut to the head of the new list.
*/
static void vdbeSorterMerge(
  SorterThread *pThread,          /* Calling thread context */
  SorterRecord *p1,               /* First list to merge */
  SorterRecord *p2,               /* Second list to merge */
  SorterRecord **ppOut            /* OUT: Head of merged list */
){
  SorterRecord *pFinal = 0;
  SorterRecord **pp = &pFinal;
  void *pVal2 = p2 ? SRVAL(p2) : 0;







|







772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
}

/*
** Merge the two sorted lists p1 and p2 into a single list.
** Set *ppOut to the head of the new list.
*/
static void vdbeSorterMerge(
  SortSubtask *pThread,          /* Calling thread context */
  SorterRecord *p1,               /* First list to merge */
  SorterRecord *p2,               /* Second list to merge */
  SorterRecord **ppOut            /* OUT: Head of merged list */
){
  SorterRecord *pFinal = 0;
  SorterRecord **pp = &pFinal;
  void *pVal2 = p2 ? SRVAL(p2) : 0;
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
}

/*
** Sort the linked list of records headed at pThread->pList. Return 
** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
** an error occurs.
*/
static int vdbeSorterSort(SorterThread *pThread){
  int i;
  SorterRecord **aSlot;
  SorterRecord *p;

  aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
  if( !aSlot ){
    return SQLITE_NOMEM;







|







806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
}

/*
** Sort the linked list of records headed at pThread->pList. Return 
** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
** an error occurs.
*/
static int vdbeSorterSort(SortSubtask *pThread){
  int i;
  SorterRecord **aSlot;
  SorterRecord *p;

  aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
  if( !aSlot ){
    return SQLITE_NOMEM;
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
**     * A varint. This varint contains the total number of bytes of content
**       in the PMA (not including the varint itself).
**
**     * One or more records packed end-to-end in order of ascending keys. 
**       Each record consists of a varint followed by a blob of data (the 
**       key). The varint is the number of bytes in the blob of data.
*/
static int vdbeSorterListToPMA(SorterThread *pThread){
  int rc = SQLITE_OK;             /* Return code */
  FileWriter writer;              /* Object used to write to the file */

  memset(&writer, 0, sizeof(FileWriter));
  assert( pThread->nInMemory>0 );

  /* If the first temporary PMA file has not been opened, open it now. */







|







970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
**     * A varint. This varint contains the total number of bytes of content
**       in the PMA (not including the varint itself).
**
**     * One or more records packed end-to-end in order of ascending keys. 
**       Each record consists of a varint followed by a blob of data (the 
**       key). The varint is the number of bytes in the blob of data.
*/
static int vdbeSorterListToPMA(SortSubtask *pThread){
  int rc = SQLITE_OK;             /* Return code */
  FileWriter writer;              /* Object used to write to the file */

  memset(&writer, 0, sizeof(FileWriter));
  assert( pThread->nInMemory>0 );

  /* If the first temporary PMA file has not been opened, open it now. */
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
** Advance the SorterMerger iterator passed as the second argument to
** the next entry. Set *pbEof to true if this means the iterator has 
** reached EOF.
**
** Return SQLITE_OK if successful or an error code if an error occurs.
*/
static int vdbeSorterNext(
  SorterThread *pThread, 
  SorterMerger *pMerger, 
  int *pbEof
){
  int rc;
  int iPrev = pMerger->aTree[1];/* Index of iterator to advance */

  /* Advance the current iterator */







|







1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
** Advance the SorterMerger iterator passed as the second argument to
** the next entry. Set *pbEof to true if this means the iterator has 
** reached EOF.
**
** Return SQLITE_OK if successful or an error code if an error occurs.
*/
static int vdbeSorterNext(
  SortSubtask *pThread, 
  SorterMerger *pMerger, 
  int *pbEof
){
  int rc;
  int iPrev = pMerger->aTree[1];/* Index of iterator to advance */

  /* Advance the current iterator */
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107

  return rc;
}

/*
** The main routine for sorter-thread operations.
*/
static void *vdbeSorterThreadMain(void *pCtx){
  int rc = SQLITE_OK;
  SorterThread *pThread = (SorterThread*)pCtx;

  assert( pThread->eWork==SORTER_THREAD_SORT
       || pThread->eWork==SORTER_THREAD_TO_PMA
       || pThread->eWork==SORTER_THREAD_CONS
  );
  assert( pThread->bDone==0 );








|

|







1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107

  return rc;
}

/*
** The main routine for sorter-thread operations.
*/
static void *vdbeSortSubtaskMain(void *pCtx){
  int rc = SQLITE_OK;
  SortSubtask *pThread = (SortSubtask*)pCtx;

  assert( pThread->eWork==SORTER_THREAD_SORT
       || pThread->eWork==SORTER_THREAD_TO_PMA
       || pThread->eWork==SORTER_THREAD_CONS
  );
  assert( pThread->bDone==0 );

1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
  return SQLITE_INT_TO_PTR(rc);
}

/*
** Run the activity scheduled by the object passed as the only argument
** in the current thread.
*/
static int vdbeSorterRunThread(SorterThread *pThread){
  int rc = SQLITE_PTR_TO_INT( vdbeSorterThreadMain((void*)pThread) );
  assert( pThread->bDone );
  pThread->bDone = 0;
  return rc;
}

/*
** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly
** using a background thread.
**
** If argument bFg is non-zero, the operation always uses the calling thread.
*/
static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
  VdbeSorter *pSorter = pCsr->pSorter;
  int rc = SQLITE_OK;
  int i;
  SorterThread *pThread = 0;    /* Thread context used to create new PMA */
  int nWorker = (pSorter->nThread-1);

  pSorter->bUsePMA = 1;
  for(i=0; i<nWorker; i++){
    int iTest = (pSorter->iPrev + i + 1) % nWorker;
    pThread = &pSorter->aThread[iTest];
#if SQLITE_MAX_WORKER_THREADS>0







|
|















|







1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
  return SQLITE_INT_TO_PTR(rc);
}

/*
** Run the activity scheduled by the object passed as the only argument
** in the current thread.
*/
static int vdbeSorterRunThread(SortSubtask *pThread){
  int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pThread) );
  assert( pThread->bDone );
  pThread->bDone = 0;
  return rc;
}

/*
** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly
** using a background thread.
**
** If argument bFg is non-zero, the operation always uses the calling thread.
*/
static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
  VdbeSorter *pSorter = pCsr->pSorter;
  int rc = SQLITE_OK;
  int i;
  SortSubtask *pThread = 0;    /* Thread context used to create new PMA */
  int nWorker = (pSorter->nThread-1);

  pSorter->bUsePMA = 1;
  for(i=0; i<nWorker; i++){
    int iTest = (pSorter->iPrev + i + 1) % nWorker;
    pThread = &pSorter->aThread[iTest];
#if SQLITE_MAX_WORKER_THREADS>0
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
        if( pSorter->aMemory==0 ){
          pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
          if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
        }else{
          pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory);
        }
      }
      rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSorterThreadMain, pCtx);
    }else
#endif
    {
      /* Use the foreground thread for this operation */
      u8 *aMem;
      rc = vdbeSorterRunThread(pThread);
      aMem = pThread->aListMemory;







|







1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
        if( pSorter->aMemory==0 ){
          pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
          if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
        }else{
          pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory);
        }
      }
      rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSortSubtaskMain, pCtx);
    }else
#endif
    {
      /* Use the foreground thread for this operation */
      u8 *aMem;
      rc = vdbeSorterRunThread(pThread);
      aMem = pThread->aListMemory;
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
  assert( pSorter );

  /* If no data has been written to disk, then do not do so now. Instead,
  ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
  ** from the in-memory list.  */
  if( pSorter->bUsePMA==0 ){
    if( pSorter->pRecord ){
      SorterThread *pThread = &pSorter->aThread[0];
      *pbEof = 0;
      pThread->pList = pSorter->pRecord;
      pThread->eWork = SORTER_THREAD_SORT;
      assert( pThread->aListMemory==0 );
      pThread->aListMemory = pSorter->aMemory;
      rc = vdbeSorterRunThread(pThread);
      pThread->aListMemory = 0;







|







1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
  assert( pSorter );

  /* If no data has been written to disk, then do not do so now. Instead,
  ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
  ** from the in-memory list.  */
  if( pSorter->bUsePMA==0 ){
    if( pSorter->pRecord ){
      SortSubtask *pThread = &pSorter->aThread[0];
      *pbEof = 0;
      pThread->pList = pSorter->pRecord;
      pThread->eWork = SORTER_THREAD_SORT;
      assert( pThread->aListMemory==0 );
      pThread->aListMemory = pSorter->aMemory;
      rc = vdbeSorterRunThread(pThread);
      pThread->aListMemory = 0;
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
  rc = vdbeSorterJoinAll(pSorter, rc);

  /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
  ** some of them together so that this is no longer the case. */
  if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
    int i;
    for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){
      SorterThread *pThread = &pSorter->aThread[i];
      if( pThread->pTemp1 ){
        pThread->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nThread;
        pThread->eWork = SORTER_THREAD_CONS;

#if SQLITE_MAX_WORKER_THREADS>0
        if( i<(pSorter->nThread-1) ){
          void *pCtx = (void*)pThread;
          rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx);
        }else
#endif
        {
          rc = vdbeSorterRunThread(pThread);
        }
      }
    }







|







|







1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
  rc = vdbeSorterJoinAll(pSorter, rc);

  /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
  ** some of them together so that this is no longer the case. */
  if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
    int i;
    for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){
      SortSubtask *pThread = &pSorter->aThread[i];
      if( pThread->pTemp1 ){
        pThread->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nThread;
        pThread->eWork = SORTER_THREAD_CONS;

#if SQLITE_MAX_WORKER_THREADS>0
        if( i<(pSorter->nThread-1) ){
          void *pCtx = (void*)pThread;
          rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSortSubtaskMain,pCtx);
        }else
#endif
        {
          rc = vdbeSorterRunThread(pThread);
        }
      }
    }
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
      rc = SQLITE_NOMEM;
    }else{
      int iIter = 0;
      int iThread = 0;
      for(iThread=0; iThread<pSorter->nThread; iThread++){
        int iPMA;
        i64 iReadOff = 0;
        SorterThread *pThread = &pSorter->aThread[iThread];
        for(iPMA=0; iPMA<pThread->nPMA && rc==SQLITE_OK; iPMA++){
          i64 nDummy = 0;
          VdbeSorterIter *pIter = &pMerger->aIter[iIter++];
          rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nDummy);
          iReadOff = pIter->iEof;
        }
      }







|







1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
      rc = SQLITE_NOMEM;
    }else{
      int iIter = 0;
      int iThread = 0;
      for(iThread=0; iThread<pSorter->nThread; iThread++){
        int iPMA;
        i64 iReadOff = 0;
        SortSubtask *pThread = &pSorter->aThread[iThread];
        for(iPMA=0; iPMA<pThread->nPMA && rc==SQLITE_OK; iPMA++){
          i64 nDummy = 0;
          VdbeSorterIter *pIter = &pMerger->aIter[iIter++];
          rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nDummy);
          iReadOff = pIter->iEof;
        }
      }