Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Do not search for locks to clear when connecting to a db in multi-process mode unless it looks like the previous user of the client-id crashed. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | server-process-edition |
Files: | files | file ages | folders |
SHA3-256: |
66fb9e1cb479f1e764f1606f041bd97f |
User & Date: | dan 2017-08-18 16:04:40.867 |
Context
2017-08-18
| ||
18:55 | Add tests to this branch. (check-in: abb6e076c8 user: dan tags: server-process-edition) | |
16:04 | Do not search for locks to clear when connecting to a db in multi-process mode unless it looks like the previous user of the client-id crashed. (check-in: 66fb9e1cb4 user: dan tags: server-process-edition) | |
2017-08-17
| ||
19:32 | Add support for crash recovery in multi-process mode. And add test cases for the same. (check-in: a8115f95e8 user: dan tags: server-process-edition) | |
Changes
Changes to src/server.c.
︙ | ︙ | |||
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | ** The argument to this macro is the value of a locking slot. This macro ** returns the current number of slow reader clients reading the page. */ #define slotGetSlowReaders(v) (((v) & HMA_SLOT_TR_MASK) >> HMA_SLOT_RLWL_BITS) #define slotReaderMask(v) ((v) & HMA_SLOT_RL_MASK) /* ** Atomic CAS primitive used in multi-process mode. Equivalent to: ** ** int serverCompareAndSwap(u32 *ptr, u32 oldval, u32 newval){ ** if( *ptr==oldval ){ ** *ptr = newval; ** return 1; ** } ** return 0; ** } */ #define serverCompareAndSwap(ptr,oldval,newval) \ __sync_bool_compare_and_swap(ptr,oldval,newval) typedef struct ServerDb ServerDb; typedef struct ServerJournal ServerJournal; | > < < < < < > > > | > > > > < < | > | 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | ** The argument to this macro is the value of a locking slot. This macro ** returns the current number of slow reader clients reading the page. */ #define slotGetSlowReaders(v) (((v) & HMA_SLOT_TR_MASK) >> HMA_SLOT_RLWL_BITS) #define slotReaderMask(v) ((v) & HMA_SLOT_RL_MASK) #define fdOpen(pFd) ((pFd)->pMethods!=0) /* ** Atomic CAS primitive used in multi-process mode. Equivalent to: ** ** int serverCompareAndSwap(u32 *ptr, u32 oldval, u32 newval){ ** if( *ptr==oldval ){ ** *ptr = newval; ** return 1; ** } ** return 0; ** } */ #define serverCompareAndSwap(ptr,oldval,newval) \ __sync_bool_compare_and_swap(ptr,oldval,newval) typedef struct ServerDb ServerDb; typedef struct ServerJournal ServerJournal; struct ServerJournal { char *zJournal; sqlite3_file *jfd; }; /* ** There is one instance of the following structure for each distinct ** database file opened in server mode by this process. */ struct ServerDb { i64 aFileId[2]; /* Opaque VFS file-id */ ServerDb *pNext; /* Next db in this process */ int nClient; /* Current number of clients */ sqlite3_mutex *mutex; /* Non-recursive mutex */ /* Variables above this point are protected by the global mutex - ** serverEnterMutex()/LeaveMutex(). Those below this point are ** protected by the ServerDb.mutex mutex. */ int bInit; /* True once initialized */ u32 transmask; /* Bitmask of taken transaction ids */ u32 *aSlot; /* Array of page locking slots */ sqlite3_vfs *pVfs; ServerJournal aJrnl[HMA_MAX_TRANSACTIONID]; u8 *aJrnlFdSpace; void *pServerShm; /* SHMOPEN handle (multi-process only) */ u32 *aClient; /* Client "transaction active" flags */ int iNextCommit; /* Commit id for next pre-commit call */ Server *pCommit; /* List of connections currently commiting */ Server *pReader; /* Connections in slower-reader transaction */ ServerPage *pPgFirst; /* First (oldest) in list of pages */ ServerPage *pPgLast; /* Last (newest) in list of pages */ ServerPage *apPg[HMA_HASH_SIZE];/* Hash table of "old" page data */ |
︙ | ︙ | |||
133 134 135 136 137 138 139 140 141 142 143 144 145 146 | int iTransId; /* Current transaction id (or -1) */ int iCommitId; /* Current commit id (or 0) */ int nAlloc; /* Allocated size of aLock[] array */ int nLock; /* Number of entries in aLock[] */ u32 *aLock; /* Array of held locks */ Server *pNext; /* Next in pCommit or pReader list */ }; struct ServerFcntlArg { void *h; /* Handle from SHMOPEN */ void *p; /* Mapping */ int i1; /* Integer value 1 */ int i2; /* Integer value 2 */ }; | > > > > > > | 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | int iTransId; /* Current transaction id (or -1) */ int iCommitId; /* Current commit id (or 0) */ int nAlloc; /* Allocated size of aLock[] array */ int nLock; /* Number of entries in aLock[] */ u32 *aLock; /* Array of held locks */ Server *pNext; /* Next in pCommit or pReader list */ }; struct ServerGlobal { ServerDb *pDb; /* Linked list of all ServerDb objects */ }; static struct ServerGlobal g_server; struct ServerFcntlArg { void *h; /* Handle from SHMOPEN */ void *p; /* Mapping */ int i1; /* Integer value 1 */ int i2; /* Integer value 2 */ }; |
︙ | ︙ | |||
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | */ static void serverEnterMutex(void){ sqlite3_mutex_enter(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1)); } static void serverLeaveMutex(void){ sqlite3_mutex_leave(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1)); } static void serverAssertMutexHeld(void){ assert( sqlite3_mutex_held(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1)) ); } /* ** Locate the ServerDb object shared by all connections to the db identified ** by aFileId[2], increment its ref count and set pNew->pDb to point to it. ** In this context "locate" may mean to find an existing object or to ** allocate a new one. */ | > > | 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | */ static void serverEnterMutex(void){ sqlite3_mutex_enter(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1)); } static void serverLeaveMutex(void){ sqlite3_mutex_leave(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1)); } #if 0 static void serverAssertMutexHeld(void){ assert( sqlite3_mutex_held(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1)) ); } #endif /* ** Locate the ServerDb object shared by all connections to the db identified ** by aFileId[2], increment its ref count and set pNew->pDb to point to it. ** In this context "locate" may mean to find an existing object or to ** allocate a new one. */ |
︙ | ︙ | |||
216 217 218 219 220 221 222 | static int serverClientRollback(Server *p, int iClient){ ServerDb *pDb = p->pDb; ServerJournal *pJ = &pDb->aJrnl[iClient]; int bExist = 1; int rc = SQLITE_OK; | | | 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 | static int serverClientRollback(Server *p, int iClient){ ServerDb *pDb = p->pDb; ServerJournal *pJ = &pDb->aJrnl[iClient]; int bExist = 1; int rc = SQLITE_OK; if( fdOpen(pJ->jfd)==0 ){ bExist = 0; rc = sqlite3OsAccess(pDb->pVfs, pJ->zJournal, SQLITE_ACCESS_EXISTS,&bExist); if( bExist && rc==SQLITE_OK ){ int flags = SQLITE_OPEN_READWRITE|SQLITE_OPEN_MAIN_JOURNAL; rc = sqlite3OsOpen(pDb->pVfs, pJ->zJournal, pJ->jfd, flags, &flags); } } |
︙ | ︙ | |||
244 245 246 247 248 249 250 251 252 253 | Server *p, sqlite3_file *dbfd, int bDelete ){ ServerDb *pDb = p->pDb; int i; for(i=0; i<HMA_MAX_TRANSACTIONID; i++){ ServerJournal *pJ = &pDb->aJrnl[i]; | > | | > | 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 | Server *p, sqlite3_file *dbfd, int bDelete ){ ServerDb *pDb = p->pDb; int i; assert( pDb->pServerShm || bDelete ); for(i=0; i<HMA_MAX_TRANSACTIONID; i++){ ServerJournal *pJ = &pDb->aJrnl[i]; if( bDelete && (pDb->pServerShm || fdOpen(pJ->jfd)) ){ int rc = serverClientRollback(p, i); if( rc!=SQLITE_OK ) bDelete = 0; } if( fdOpen(pJ->jfd) ){ sqlite3OsClose(pJ->jfd); if( bDelete ) sqlite3OsDelete(pDb->pVfs, pJ->zJournal, 0); } sqlite3_free(pJ->zJournal); } memset(pDb->aJrnl, 0, sizeof(ServerJournal)*HMA_MAX_TRANSACTIONID); if( pDb->aJrnlFdSpace ){ sqlite3_free(pDb->aJrnlFdSpace); pDb->aJrnlFdSpace = 0; |
︙ | ︙ | |||
277 278 279 280 281 282 283 284 285 286 287 288 289 290 | }else{ sqlite3_free(pDb->aSlot); } pDb->aSlot = 0; pDb->bInit = 0; } static void serverClientUnlock(Server *p, int iClient){ ServerDb *pDb = p->pDb; int i; assert( pDb->pServerShm ); for(i=0; i<HMA_PAGELOCK_SLOTS; i++){ u32 *pSlot = &pDb->aSlot[i]; | > > > > > > > > | 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 | }else{ sqlite3_free(pDb->aSlot); } pDb->aSlot = 0; pDb->bInit = 0; } /* ** Clear all page locks held by client iClient. The handle passed as the ** first argument may or may not correspond to client iClient. ** ** This function is called in multi-process mode as part of restoring the ** system state after it has been detected that client iClient may have ** failed mid transaction. It is never called for a single process system. */ static void serverClientUnlock(Server *p, int iClient){ ServerDb *pDb = p->pDb; int i; assert( pDb->pServerShm ); for(i=0; i<HMA_PAGELOCK_SLOTS; i++){ u32 *pSlot = &pDb->aSlot[i]; |
︙ | ︙ | |||
322 323 324 325 326 327 328 | if( pDb->aJrnlFdSpace==0 ){ rc = SQLITE_NOMEM_BKPT; }else{ if( eServer==2 ){ ServerFcntlArg arg; arg.h = 0; arg.p = 0; | | > | 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 | if( pDb->aJrnlFdSpace==0 ){ rc = SQLITE_NOMEM_BKPT; }else{ if( eServer==2 ){ ServerFcntlArg arg; arg.h = 0; arg.p = 0; arg.i1 = sizeof(u32)*(HMA_PAGELOCK_SLOTS + HMA_MAX_TRANSACTIONID); arg.i2 = 0; rc = sqlite3OsFileControl(dbfd, SQLITE_FCNTL_SERVER_SHMOPEN, (void*)&arg); if( rc==SQLITE_OK ){ pDb->aSlot = (u32*)arg.p; pDb->aClient = &pDb->aSlot[HMA_PAGELOCK_SLOTS]; pDb->pServerShm = arg.h; bRollback = arg.i2; } }else{ pDb->aSlot = (u32*)sqlite3MallocZero(sizeof(u32)*HMA_PAGELOCK_SLOTS); if( pDb->aSlot==0 ) rc = SQLITE_NOMEM_BKPT; bRollback = 1; |
︙ | ︙ | |||
495 496 497 498 499 500 501 | sqlite3_mutex_leave(pNew->pDb->mutex); /* If this is a multi-process database, it may be that the previous ** user of client-id pNew->iTransId crashed mid transaction. Roll ** back any hot journal file in the file-system and release ** page locks held by any crashed process. TODO: The call to ** serverClientUnlock() is expensive. */ | | | 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 | sqlite3_mutex_leave(pNew->pDb->mutex); /* If this is a multi-process database, it may be that the previous ** user of client-id pNew->iTransId crashed mid transaction. Roll ** back any hot journal file in the file-system and release ** page locks held by any crashed process. TODO: The call to ** serverClientUnlock() is expensive. */ if( rc==SQLITE_OK && pDb->pServerShm && pDb->aClient[pNew->iTransId] ){ serverClientUnlock(pNew, pNew->iTransId); rc = serverClientRollback(pNew, pNew->iTransId); } } }else{ rc = SQLITE_NOMEM_BKPT; } |
︙ | ︙ | |||
527 528 529 530 531 532 533 534 535 536 537 538 539 540 | if( p->eTrans==SERVER_TRANS_NONE ){ ServerDb *pDb = p->pDb; u32 t; assert( p->pNext==0 ); if( pDb->pServerShm ){ p->eTrans = SERVER_TRANS_READWRITE; }else{ assert( p->iTransId<0 ); sqlite3_mutex_enter(pDb->mutex); if( bReadonly ){ Server *pIter; p->iCommitId = pDb->iNextCommit; for(pIter=pDb->pCommit; pIter; pIter=pIter->pNext){ | > | 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 | if( p->eTrans==SERVER_TRANS_NONE ){ ServerDb *pDb = p->pDb; u32 t; assert( p->pNext==0 ); if( pDb->pServerShm ){ p->eTrans = SERVER_TRANS_READWRITE; pDb->aClient[p->iTransId] = 1; }else{ assert( p->iTransId<0 ); sqlite3_mutex_enter(pDb->mutex); if( bReadonly ){ Server *pIter; p->iCommitId = pDb->iNextCommit; for(pIter=pDb->pCommit; pIter; pIter=pIter->pNext){ |
︙ | ︙ | |||
674 675 676 677 678 679 680 | pDb->pPgFirst = pPg; } } sqlite3_mutex_leave(pDb->mutex); p->pNext = 0; | < > > | 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 | pDb->pPgFirst = pPg; } } sqlite3_mutex_leave(pDb->mutex); p->pNext = 0; p->iTransId = -1; p->iCommitId = 0; } /* ** End a transaction (and release all locks). */ int sqlite3ServerEnd(Server *p){ if( p->eTrans!=SERVER_TRANS_NONE ){ if( p->pDb->pServerShm ){ serverReleaseLocks(p); p->pDb->aClient[p->iTransId] = 0; }else{ serverEndSingle(p); } p->eTrans = SERVER_TRANS_NONE; } return SQLITE_OK; } int sqlite3ServerPreCommit(Server *p, ServerPage *pPg){ ServerDb *pDb = p->pDb; int rc = SQLITE_OK; |
︙ | ︙ |