/ Check-in [6340ca5e]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add some assert() statements to the asychronous backend demo to enforce the strategy used to avoid deadlock. Also a minor change to avoid a potential deadlock. (CVS 4520)
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 6340ca5eee3d398a9ef4f37a442efad37c9bf547
User & Date: danielk1977 2007-11-02 09:07:58
Context
2007-11-02
12:52
Fix building of sqlite3_analyzer on Mac. (CVS 4521) check-in: a616b6cb user: drh tags: trunk
09:07
Add some assert() statements to the asychronous backend demo to enforce the strategy used to avoid deadlock. Also a minor change to avoid a potential deadlock. (CVS 4520) check-in: 6340ca5e user: danielk1977 tags: trunk
2007-11-01
17:38
Add a prototype "group_concat()" aggregate function to func.c. Disabled by default. No documentation nor test cases. No effort to make it efficient. (CVS 4519) check-in: 61987a89 user: drh tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/test_async.c.

153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
...
259
260
261
262
263
264
265

266
267
268
269
270
271
272
273
274
275
...
408
409
410
411
412
413
414





























































































































































415
416
417
418
419
420
421
...
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
...
927
928
929
930
931
932
933




934
935
936
937
938
939
940
....
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
** Basic rules:
**
**     * Both read and write access to the global write-op queue must be 
**       protected by the async.queueMutex. As are the async.ioError and
**       async.nFile variables.
**
**     * The async.aLock hash-table and all AsyncLock and AsyncFileLock
**       structures must be protected by teh async.lockMutex mutex.
**
**     * The file handles from the underlying system are assumed not to 
**       be thread safe.
**
**     * See the last two paragraphs under "The Writer Thread" for
**       an assumption to do with file-handle synchronization by the Os.
**
................................................................................
/*
** State information is held in the static variable "async" defined
** as the following structure.
**
** Both async.ioError and async.nFile are protected by async.queueMutex.
*/
static struct TestAsyncStaticData {

  pthread_mutex_t queueMutex;  /* Mutex for access to write operation queue */
  pthread_mutex_t writerMutex; /* Prevents multiple writer threads */
  pthread_mutex_t lockMutex;   /* For access to aLock hash table */
  pthread_cond_t queueSignal;  /* For waking up sleeping writer thread */
  pthread_cond_t emptySignal;  /* Notify when the write queue is empty */
  AsyncWrite *pQueueFirst;     /* Next write operation to be processed */
  AsyncWrite *pQueueLast;      /* Last write operation on the list */
  Hash aLock;                  /* Files locked */
  volatile int ioDelay;             /* Extra delay between write operations */
  volatile int writerHaltWhenIdle;  /* Writer thread halts when queue empty */
................................................................................
  int nName;                 /* Number of characters in zName */
  sqlite3_file *pBaseRead;   /* Read handle to the underlying Os file */
  sqlite3_file *pBaseWrite;  /* Write handle to the underlying Os file */
  AsyncFileLock lock;
  AsyncWrite close;
};






























































































































































/*
** Add an entry to the end of the global write-op list. pWrite should point 
** to an AsyncWrite structure allocated using sqlite3_malloc().  The writer
** thread will call sqlite3_free() to free the structure after the specified
** operation has been completed.
**
** Once an AsyncWrite structure has been added to the list, it becomes the
................................................................................
    }
  }

  if( rc==SQLITE_OK ){
    HashElem *pElem;
    p->pMethod = &async_methods;
    p->pData = pData;
    incrOpenFileCount();

    /* Link AsyncFileData.lock into the linked list of 
    ** AsyncFileLock structures for this file.
    */
    pData->lock.pNext = pLock->pList;
    pLock->pList = &pData->lock;

................................................................................
  }else{
    sqlite3OsClose(pData->pBaseRead);
    sqlite3OsClose(pData->pBaseWrite);
    sqlite3_free(pData);
  }

  pthread_mutex_unlock(&async.lockMutex);





  if( rc==SQLITE_OK && isExclusive ){
    rc = addNewAsyncWrite(pData, ASYNC_OPENEXCLUSIVE, (i64)flags, 0, 0);
    if( rc==SQLITE_OK ){
      if( pOutFlags ) *pOutFlags = flags;
    }else{
      pthread_mutex_lock(&async.lockMutex);
................................................................................
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  pthread_t x;
  int rc;
  volatile int isStarted = 0;
  rc = pthread_create(&x, 0, asyncWriterThread, &isStarted);
  if( rc ){
    Tcl_AppendResult(interp, "failed to create the thread", 0);
    return TCL_ERROR;
  }
  pthread_detach(x);
  while( isStarted==0 ){
    sched_yield();







|







 







>


<







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







<







 







>
>
>
>







 







|







153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
...
259
260
261
262
263
264
265
266
267
268

269
270
271
272
273
274
275
...
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
....
1067
1068
1069
1070
1071
1072
1073

1074
1075
1076
1077
1078
1079
1080
....
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
....
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
** Basic rules:
**
**     * Both read and write access to the global write-op queue must be 
**       protected by the async.queueMutex. As are the async.ioError and
**       async.nFile variables.
**
**     * The async.aLock hash-table and all AsyncLock and AsyncFileLock
**       structures must be protected by the async.lockMutex mutex.
**
**     * The file handles from the underlying system are assumed not to 
**       be thread safe.
**
**     * See the last two paragraphs under "The Writer Thread" for
**       an assumption to do with file-handle synchronization by the Os.
**
................................................................................
/*
** State information is held in the static variable "async" defined
** as the following structure.
**
** Both async.ioError and async.nFile are protected by async.queueMutex.
*/
static struct TestAsyncStaticData {
  pthread_mutex_t lockMutex;   /* For access to aLock hash table */
  pthread_mutex_t queueMutex;  /* Mutex for access to write operation queue */
  pthread_mutex_t writerMutex; /* Prevents multiple writer threads */

  pthread_cond_t queueSignal;  /* For waking up sleeping writer thread */
  pthread_cond_t emptySignal;  /* Notify when the write queue is empty */
  AsyncWrite *pQueueFirst;     /* Next write operation to be processed */
  AsyncWrite *pQueueLast;      /* Last write operation on the list */
  Hash aLock;                  /* Files locked */
  volatile int ioDelay;             /* Extra delay between write operations */
  volatile int writerHaltWhenIdle;  /* Writer thread halts when queue empty */
................................................................................
  int nName;                 /* Number of characters in zName */
  sqlite3_file *pBaseRead;   /* Read handle to the underlying Os file */
  sqlite3_file *pBaseWrite;  /* Write handle to the underlying Os file */
  AsyncFileLock lock;
  AsyncWrite close;
};

/*
** The following async_XXX functions are debugging wrappers around the
** corresponding pthread_XXX functions:
**
**     pthread_mutex_lock();
**     pthread_mutex_unlock();
**     pthread_mutex_trylock();
**     pthread_cond_wait();
**
** It is illegal to pass any mutex other than those stored in the
** following global variables of these functions.
**
**     async.queueMutex
**     async.writerMutex
**     async.lockMutex
**
** If NDEBUG is defined, these wrappers do nothing except call the 
** corresponding pthreads function. If NDEBUG is not defined, then the
** following variables are used to store the thread-id (as returned
** by pthread_self()) currently holding the mutex, or 0 otherwise:
**
**     asyncdebug.queueMutexHolder
**     asyncdebug.writerMutexHolder
**     asyncdebug.lockMutexHolder
**
** These variables are used by some assert() statements that verify
** the statements made in the "Deadlock Prevention" notes earlier
** in this file.
*/
#ifndef NDEBUG

static struct TestAsyncDebugData {
  pthread_t lockMutexHolder;
  pthread_t queueMutexHolder;
  pthread_t writerMutexHolder;
} asyncdebug = {0, 0, 0};

/*
** Wrapper around pthread_mutex_lock(). Checks that we have not violated
** the anti-deadlock rules (see "Deadlock prevention" above).
*/
static int async_mutex_lock(pthread_mutex_t *pMutex){
  int iIdx;
  int rc;
  pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async);
  pthread_t *aHolder = (pthread_t *)(&asyncdebug);

  /* The code in this 'ifndef NDEBUG' block depends on a certain alignment
   * of the variables in TestAsyncStaticData and TestAsyncDebugData. The
   * following assert() statements check that this has not been changed.
   *
   * Really, these only need to be run once at startup time.
   */
  assert(&(aMutex[0])==&async.lockMutex);
  assert(&(aMutex[1])==&async.queueMutex);
  assert(&(aMutex[2])==&async.writerMutex);
  assert(&(aHolder[0])==&asyncdebug.lockMutexHolder);
  assert(&(aHolder[1])==&asyncdebug.queueMutexHolder);
  assert(&(aHolder[2])==&asyncdebug.writerMutexHolder);

  assert( pthread_self()!=0 );

  for(iIdx=0; iIdx<3; iIdx++){
    if( pMutex==&aMutex[iIdx] ) break;

    /* This is the key assert(). Here we are checking that if the caller
     * is trying to block on async.writerMutex, neither of the other two
     * mutex are held. If the caller is trying to block on async.queueMutex,
     * lockMutex is not held.
     */
    assert(!pthread_equal(aHolder[iIdx], pthread_self()));
  }
  assert(iIdx<3);

  rc = pthread_mutex_lock(pMutex);
  if( rc==0 ){
    assert(aHolder[iIdx]==0);
    aHolder[iIdx] = pthread_self();
  }
  return rc;
}

/*
** Wrapper around pthread_mutex_unlock().
*/
static int async_mutex_unlock(pthread_mutex_t *pMutex){
  int iIdx;
  int rc;
  pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async);
  pthread_t *aHolder = (pthread_t *)(&asyncdebug);

  for(iIdx=0; iIdx<3; iIdx++){
    if( pMutex==&aMutex[iIdx] ) break;
  }
  assert(iIdx<3);

  assert(pthread_equal(aHolder[iIdx], pthread_self()));
  aHolder[iIdx] = 0;
  rc = pthread_mutex_unlock(pMutex);
  assert(rc==0);

  return 0;
}

/*
** Wrapper around pthread_mutex_trylock().
*/
static int async_mutex_trylock(pthread_mutex_t *pMutex){
  int iIdx;
  int rc;
  pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async);
  pthread_t *aHolder = (pthread_t *)(&asyncdebug);

  for(iIdx=0; iIdx<3; iIdx++){
    if( pMutex==&aMutex[iIdx] ) break;
  }
  assert(iIdx<3);

  rc = pthread_mutex_trylock(pMutex);
  if( rc==0 ){
    assert(aHolder[iIdx]==0);
    aHolder[iIdx] = pthread_self();
  }
  return rc;
}

/*
** Wrapper around pthread_cond_wait().
*/
static int async_cond_wait(pthread_cond_t *pCond, pthread_mutex_t *pMutex){
  int iIdx;
  int rc;
  pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async);
  pthread_t *aHolder = (pthread_t *)(&asyncdebug);

  for(iIdx=0; iIdx<3; iIdx++){
    if( pMutex==&aMutex[iIdx] ) break;
  }
  assert(iIdx<3);

  assert(pthread_equal(aHolder[iIdx],pthread_self()));
  aHolder[iIdx] = 0;
  rc = pthread_cond_wait(pCond, pMutex);
  if( rc==0 ){
    aHolder[iIdx] = pthread_self();
  }
  return rc;
}

/* Call our async_XX wrappers instead of selected pthread_XX functions */
#define pthread_mutex_lock    async_mutex_lock
#define pthread_mutex_unlock  async_mutex_unlock
#define pthread_mutex_trylock async_mutex_trylock
#define pthread_cond_wait     async_cond_wait

#endif   /* !defined(NDEBUG) */

/*
** Add an entry to the end of the global write-op list. pWrite should point 
** to an AsyncWrite structure allocated using sqlite3_malloc().  The writer
** thread will call sqlite3_free() to free the structure after the specified
** operation has been completed.
**
** Once an AsyncWrite structure has been added to the list, it becomes the
................................................................................
    }
  }

  if( rc==SQLITE_OK ){
    HashElem *pElem;
    p->pMethod = &async_methods;
    p->pData = pData;


    /* Link AsyncFileData.lock into the linked list of 
    ** AsyncFileLock structures for this file.
    */
    pData->lock.pNext = pLock->pList;
    pLock->pList = &pData->lock;

................................................................................
  }else{
    sqlite3OsClose(pData->pBaseRead);
    sqlite3OsClose(pData->pBaseWrite);
    sqlite3_free(pData);
  }

  pthread_mutex_unlock(&async.lockMutex);

  if( rc==SQLITE_OK ){
    incrOpenFileCount();
  }

  if( rc==SQLITE_OK && isExclusive ){
    rc = addNewAsyncWrite(pData, ASYNC_OPENEXCLUSIVE, (i64)flags, 0, 0);
    if( rc==SQLITE_OK ){
      if( pOutFlags ) *pOutFlags = flags;
    }else{
      pthread_mutex_lock(&async.lockMutex);
................................................................................
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  pthread_t x;
  int rc;
  volatile int isStarted = 0;
  rc = pthread_create(&x, 0, asyncWriterThread, (void *)&isStarted);
  if( rc ){
    Tcl_AppendResult(interp, "failed to create the thread", 0);
    return TCL_ERROR;
  }
  pthread_detach(x);
  while( isStarted==0 ){
    sched_yield();