Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Even if compile time option SQLITE_MAX_WORKER_THREADS is set to one or greater, set the default number of worker threads to zero. Distribute data more evenly between threads in sqlite3VdbeSorterWrite() to improve performance when sorting large amounts of data. Add new test file sort2.test. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | threads |
Files: | files | file ages | folders |
SHA1: |
643c86a056168e39fcb7f39b8a72731f |
User & Date: | dan 2014-04-01 15:38:44.990 |
Context
2014-04-01
| ||
18:41 | When sorting data for a CREATE INDEX statement in single-threaded mode, assume that keys are delivered to the sorter in primary key order. Also fix various comments that had fallen out of date. (check-in: 821d1ac450 user: dan tags: threads) | |
15:38 | Even if compile time option SQLITE_MAX_WORKER_THREADS is set to one or greater, set the default number of worker threads to zero. Distribute data more evenly between threads in sqlite3VdbeSorterWrite() to improve performance when sorting large amounts of data. Add new test file sort2.test. (check-in: 643c86a056 user: dan tags: threads) | |
10:19 | Fix a problem with OOM handling in the sorter code. (check-in: 59cd5229e2 user: dan tags: threads) | |
Changes
Changes to src/global.c.
︙ | ︙ | |||
163 164 165 166 167 168 169 | 0, /* szScratch */ 0, /* nScratch */ (void*)0, /* pPage */ 0, /* szPage */ 0, /* nPage */ 0, /* mxParserStack */ 0, /* sharedCacheEnabled */ | | | 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | 0, /* szScratch */ 0, /* nScratch */ (void*)0, /* pPage */ 0, /* szPage */ 0, /* nPage */ 0, /* mxParserStack */ 0, /* sharedCacheEnabled */ 0, /* nWorker */ /* All the rest should always be initialized to zero */ 0, /* isInit */ 0, /* inProgress */ 0, /* isMutexInit */ 0, /* isMallocInit */ 0, /* isPCacheInit */ 0, /* pInitMutex */ |
︙ | ︙ |
Changes to src/shell.c.
︙ | ︙ | |||
3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 | data->mode = MODE_List; memcpy(data->separator,"|", 2); data->showHeader = 0; sqlite3_config(SQLITE_CONFIG_URI, 1); sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data); sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> "); sqlite3_snprintf(sizeof(continuePrompt), continuePrompt," ...> "); } /* ** Output text to the console in a font that attracts extra attention. */ #ifdef _WIN32 static void printBold(const char *zText){ | > > | 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 | data->mode = MODE_List; memcpy(data->separator,"|", 2); data->showHeader = 0; sqlite3_config(SQLITE_CONFIG_URI, 1); sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data); sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> "); sqlite3_snprintf(sizeof(continuePrompt), continuePrompt," ...> "); sqlite3_config(SQLITE_CONFIG_MULTITHREAD); sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 3); } /* ** Output text to the console in a font that attracts extra attention. */ #ifdef _WIN32 static void printBold(const char *zText){ |
︙ | ︙ |
Changes to src/test_config.c.
︙ | ︙ | |||
94 95 96 97 98 99 100 101 102 103 104 105 106 107 | #endif #if SQLITE_MAX_MMAP_SIZE>0 Tcl_SetVar2(interp, "sqlite_options", "mmap", "1", TCL_GLOBAL_ONLY); #else Tcl_SetVar2(interp, "sqlite_options", "mmap", "0", TCL_GLOBAL_ONLY); #endif #if 1 /* def SQLITE_MEMDEBUG */ Tcl_SetVar2(interp, "sqlite_options", "memdebug", "1", TCL_GLOBAL_ONLY); #else Tcl_SetVar2(interp, "sqlite_options", "memdebug", "0", TCL_GLOBAL_ONLY); #endif | > > > > > > | 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | #endif #if SQLITE_MAX_MMAP_SIZE>0 Tcl_SetVar2(interp, "sqlite_options", "mmap", "1", TCL_GLOBAL_ONLY); #else Tcl_SetVar2(interp, "sqlite_options", "mmap", "0", TCL_GLOBAL_ONLY); #endif #if SQLITE_MAX_WORKER_THREADS>0 Tcl_SetVar2(interp, "sqlite_options", "worker_threads", "1", TCL_GLOBAL_ONLY); #else Tcl_SetVar2(interp, "sqlite_options", "worker_threads", "0", TCL_GLOBAL_ONLY); #endif #if 1 /* def SQLITE_MEMDEBUG */ Tcl_SetVar2(interp, "sqlite_options", "memdebug", "1", TCL_GLOBAL_ONLY); #else Tcl_SetVar2(interp, "sqlite_options", "memdebug", "0", TCL_GLOBAL_ONLY); #endif |
︙ | ︙ |
Changes to src/test_malloc.c.
︙ | ︙ | |||
1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 | } rc = sqlite3_config(SQLITE_CONFIG_COVERING_INDEX_SCAN, bUseCis); Tcl_SetResult(interp, (char *)sqlite3ErrName(rc), TCL_VOLATILE); return TCL_OK; } /* ** Usage: sqlite3_dump_memsys3 FILENAME ** sqlite3_dump_memsys5 FILENAME ** ** Write a summary of unfreed memsys3 allocations to FILENAME. */ | > > > > > > > > > > > > > > > > > > > > > > > > > > | 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 | } rc = sqlite3_config(SQLITE_CONFIG_COVERING_INDEX_SCAN, bUseCis); Tcl_SetResult(interp, (char *)sqlite3ErrName(rc), TCL_VOLATILE); return TCL_OK; } /* ** Usage: sqlite3_config_worker_threads N */ static int test_config_worker_threads( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ int rc; int nThread; if( objc!=2 ){ Tcl_WrongNumArgs(interp, 1, objv, "N"); return TCL_ERROR; } if( Tcl_GetIntFromObj(interp, objv[1], &nThread) ){ return TCL_ERROR; } rc = sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, nThread); Tcl_SetResult(interp, (char *)sqlite3ErrName(rc), TCL_VOLATILE); return TCL_OK; } /* ** Usage: sqlite3_dump_memsys3 FILENAME ** sqlite3_dump_memsys5 FILENAME ** ** Write a summary of unfreed memsys3 allocations to FILENAME. */ |
︙ | ︙ | |||
1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 | { "sqlite3_config_heap", test_config_heap ,0 }, { "sqlite3_config_heap_size", test_config_heap_size ,0 }, { "sqlite3_config_memstatus", test_config_memstatus ,0 }, { "sqlite3_config_lookaside", test_config_lookaside ,0 }, { "sqlite3_config_error", test_config_error ,0 }, { "sqlite3_config_uri", test_config_uri ,0 }, { "sqlite3_config_cis", test_config_cis ,0 }, { "sqlite3_db_config_lookaside",test_db_config_lookaside ,0 }, { "sqlite3_dump_memsys3", test_dump_memsys3 ,3 }, { "sqlite3_dump_memsys5", test_dump_memsys3 ,5 }, { "sqlite3_install_memsys3", test_install_memsys3 ,0 }, { "sqlite3_memdebug_vfs_oom_test", test_vfs_oom_test ,0 }, }; int i; | > | 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 | { "sqlite3_config_heap", test_config_heap ,0 }, { "sqlite3_config_heap_size", test_config_heap_size ,0 }, { "sqlite3_config_memstatus", test_config_memstatus ,0 }, { "sqlite3_config_lookaside", test_config_lookaside ,0 }, { "sqlite3_config_error", test_config_error ,0 }, { "sqlite3_config_uri", test_config_uri ,0 }, { "sqlite3_config_cis", test_config_cis ,0 }, { "sqlite3_config_worker_threads", test_config_worker_threads ,0 }, { "sqlite3_db_config_lookaside",test_db_config_lookaside ,0 }, { "sqlite3_dump_memsys3", test_dump_memsys3 ,3 }, { "sqlite3_dump_memsys5", test_dump_memsys3 ,5 }, { "sqlite3_install_memsys3", test_install_memsys3 ,0 }, { "sqlite3_memdebug_vfs_oom_test", test_vfs_oom_test ,0 }, }; int i; |
︙ | ︙ |
Changes to src/vdbesort.c.
︙ | ︙ | |||
176 177 178 179 180 181 182 183 184 185 186 187 188 189 | int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ int bUsePMA; /* True if one or more PMAs created */ SorterRecord *pRecord; /* Head of in-memory record list */ SorterMerger *pMerger; /* For final merge of PMAs (by caller) */ u8 *aMemory; /* Block of memory to alloc records from */ int iMemory; /* Offset of first free byte in aMemory */ int nMemory; /* Size of aMemory allocation in bytes */ int nThread; /* Size of aThread[] array */ SorterThread aThread[1]; }; /* ** The following type is an iterator for a PMA. It caches the current key in ** variables nKey/aKey. If the iterator is at EOF, pFile==0. | > | 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ int bUsePMA; /* True if one or more PMAs created */ SorterRecord *pRecord; /* Head of in-memory record list */ SorterMerger *pMerger; /* For final merge of PMAs (by caller) */ u8 *aMemory; /* Block of memory to alloc records from */ int iMemory; /* Offset of first free byte in aMemory */ int nMemory; /* Size of aMemory allocation in bytes */ int iPrev; /* Previous thread used to flush PMA */ int nThread; /* Size of aThread[] array */ SorterThread aThread[1]; }; /* ** The following type is an iterator for a PMA. It caches the current key in ** variables nKey/aKey. If the iterator is at EOF, pFile==0. |
︙ | ︙ | |||
1247 1248 1249 1250 1251 1252 1253 | ** ** If argument bFg is non-zero, the operation always uses the calling thread. */ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; int i; | | > | > | > > > > > | | 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 | ** ** If argument bFg is non-zero, the operation always uses the calling thread. */ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; int i; SorterThread *pThread = 0; /* Thread context used to create new PMA */ int nWorker = (pSorter->nThread-1); pSorter->bUsePMA = 1; for(i=0; i<nWorker; i++){ int iTest = (pSorter->iPrev + i + 1) % nWorker; pThread = &pSorter->aThread[iTest]; #if SQLITE_MAX_WORKER_THREADS>0 if( pThread->bDone ){ void *pRet; assert( pThread->pThread ); rc = sqlite3ThreadJoin(pThread->pThread, &pRet); pThread->pThread = 0; pThread->bDone = 0; if( rc==SQLITE_OK ){ rc = SQLITE_PTR_TO_INT(pRet); } } #endif if( pThread->pThread==0 ) break; pThread = 0; } if( pThread==0 ){ pThread = &pSorter->aThread[nWorker]; } pSorter->iPrev = (pThread - pSorter->aThread); if( rc==SQLITE_OK ){ assert( pThread->pThread==0 && pThread->bDone==0 ); pThread->eWork = SORTER_THREAD_TO_PMA; pThread->pList = pSorter->pRecord; pThread->nInMemory = pSorter->nInMemory; pSorter->nInMemory = 0; pSorter->pRecord = 0; if( pSorter->aMemory ){ u8 *aMem = pThread->aListMemory; pThread->aListMemory = pSorter->aMemory; pSorter->aMemory = aMem; } #if SQLITE_MAX_WORKER_THREADS>0 if( !bFg && pThread!=&pSorter->aThread[nWorker] ){ /* Launch a background thread for this operation */ void *pCtx = (void*)pThread; assert( pSorter->aMemory==0 || pThread->aListMemory!=0 ); if( pThread->aListMemory ){ if( pSorter->aMemory==0 ){ pSorter->aMemory = sqlite3Malloc(pSorter->nMemory); if( pSorter->aMemory==0 ) return SQLITE_NOMEM; |
︙ | ︙ | |||
1460 1461 1462 1463 1464 1465 1466 | /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge ** some of them together so that this is no longer the case. */ if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ int i; for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){ SorterThread *pThread = &pSorter->aThread[i]; if( pThread->pTemp1 ){ | | | 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 | /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge ** some of them together so that this is no longer the case. */ if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ int i; for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){ SorterThread *pThread = &pSorter->aThread[i]; if( pThread->pTemp1 ){ pThread->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nThread; pThread->eWork = SORTER_THREAD_CONS; #if SQLITE_MAX_WORKER_THREADS>0 if( i<(pSorter->nThread-1) ){ void *pCtx = (void*)pThread; rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx); }else |
︙ | ︙ |
Added test/sort2.test.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | # 2014 March 25. # # The author disclaims copyright to this source code. In place of # a legal notice, here is a blessing: # # May you do good and not evil. # May you find forgiveness for yourself and forgive others. # May you share freely, never taking more than you give. # #*********************************************************************** # This file implements regression tests for SQLite library. # set testdir [file dirname $argv0] source $testdir/tester.tcl set testprefix sort2 db close sqlite3_shutdown sqlite3_config_worker_threads 7 reset_db do_execsql_test 1 { PRAGMA cache_size = 5; WITH r(x,y) AS ( SELECT 1, randomblob(100) UNION ALL SELECT x+1, randomblob(100) FROM r LIMIT 100000 ) SELECT count(x), length(y) FROM r GROUP BY (x%5) } { 20000 100 20000 100 20000 100 20000 100 20000 100 } db close sqlite3_shutdown sqlite3_config_worker_threads 0 sqlite3_initialize finish_test |