Index: src/pager.c ================================================================== --- src/pager.c +++ src/pager.c @@ -706,10 +706,11 @@ Wal *pWal; /* Write-ahead log used by "journal_mode=wal" */ char *zWal; /* File name for write-ahead log */ #endif #ifdef SQLITE_SERVER_EDITION Server *pServer; + ServerPage *pServerPage; #endif }; /* ** Indexes for use with Pager.aStat[]. The Pager.aStat[] array contains @@ -1787,10 +1788,22 @@ } } return rc; } +#ifdef SQLITE_SERVER_EDITION +static void pagerFreeServerPage(Pager *pPager){ + ServerPage *pPg; + ServerPage *pNext; + for(pPg=pPager->pServerPage; pPg; pPg=pNext){ + pNext = pPg->pNext; + sqlite3_free(pPg); + } + pPager->pServerPage = 0; +} +#endif + /* ** This function is a no-op if the pager is in exclusive mode and not ** in the ERROR state. Otherwise, it switches the pager to PAGER_OPEN ** state. ** @@ -1817,10 +1830,11 @@ pPager->pInJournal = 0; releaseAllSavepoints(pPager); #ifdef SQLITE_SERVER_EDITION if( pagerIsServer(pPager) ){ + pagerFreeServerPage(pPager); sqlite3ServerEnd(pPager->pServer); pPager->eState = PAGER_OPEN; }else #endif if( pagerUseWal(pPager) ){ @@ -4356,10 +4370,18 @@ /* This function is only called for rollback pagers in WRITER_DBMOD state. */ assert( !pagerUseWal(pPager) ); assert( pPager->tempFile || pPager->eState==PAGER_WRITER_DBMOD ); assert( pPager->eLock==EXCLUSIVE_LOCK ); assert( isOpen(pPager->fd) || pList->pDirty==0 ); + +#ifdef SQLITE_SERVER_EDITION + if( pagerIsServer(pPager) ){ + rc = sqlite3ServerPreCommit(pPager->pServer, pPager->pServerPage); + pPager->pServerPage = 0; + if( rc!=SQLITE_OK ) return rc; + } +#endif /* If the file is a temp-file has not yet been opened, open it now. It ** is not possible for rc to be other than SQLITE_OK if this branch ** is taken, as pager_wait_on_lock() is a no-op for temp-files. */ @@ -4533,10 +4555,12 @@ ** is returned by sqlite3PcacheMakeClean() is not called. */ static int pagerStress(void *p, PgHdr *pPg){ Pager *pPager = (Pager *)p; int rc = SQLITE_OK; + + if( pagerIsServer(pPager) ) return SQLITE_OK; assert( pPg->pPager==pPager ); assert( pPg->flags&PGHDR_DIRTY ); /* The doNotSpill NOSYNC bit is set during times when doing a sync of @@ -5906,10 +5930,24 @@ Pager *pPager = pPg->pPager; int rc; u32 cksum; char *pData2; i64 iOff = pPager->journalOff; + +#ifdef SQLITE_SERVER_EDITION + if( pagerIsServer(pPager) ){ + int nByte = sizeof(ServerPage) + pPager->pageSize; + ServerPage *p = (ServerPage*)sqlite3_malloc(nByte); + if( !p ) return SQLITE_NOMEM_BKPT; + memset(p, 0, sizeof(ServerPage)); + p->aData = (u8*)&p[1]; + p->nData = pPager->pageSize; + p->pgno = pPg->pgno; + p->pNext = pPager->pServerPage; + pPager->pServerPage = p; + } +#endif /* We should never write to the journal file the page that ** contains the database locks. The following assert verifies ** that we do not. */ assert( pPg->pgno!=PAGER_MJ_PGNO(pPager) ); Index: src/server.c ================================================================== --- src/server.c +++ src/server.c @@ -11,42 +11,62 @@ ************************************************************************* */ #include "sqliteInt.h" +#ifdef SQLITE_SERVER_EDITION + /* ** Page-locking slot format: ** ** Assuming HMA_MAX_TRANSACTIONID is set to 16. ** ** The least-significant 16 bits are used for read locks. When a read ** lock is taken, the client sets the bit associated with its ** transaction-id. ** -** The next 8 bits are set to the number of transient-read locks -** currently held on the page. -** ** The next 5 bits are set to 0 if no client currently holds a write ** lock. Or to (transaction-id + 1) if a write lock is held. +** +** The next 8 bits are set to the number of transient-read locks +** currently held on the page. */ +#define HMA_SLOT_RL_BITS 16 /* bits for Read Locks */ +#define HMA_SLOT_WL_BITS 5 /* bits for Write Locks */ +#define HMA_SLOT_TR_BITS 8 /* bits for Transient Reader locks */ -#ifdef SQLITE_SERVER_EDITION +#define HMA_SLOT_RLWL_BITS (HMA_SLOT_RL_BITS + HMA_SLOT_WL_BITS) + + +#define HMA_SLOT_RL_MASK ((1 << HMA_SLOT_RL_BITS)-1) +#define HMA_SLOT_WL_MASK (((1 << HMA_SLOT_WL_BITS)-1) << HMA_SLOT_RL_BITS) +#define HMA_SLOT_TR_MASK (((1 << HMA_SLOT_TR_BITS)-1) << HMA_SLOT_RLWL_BITS) + /* Number of page-locking slots */ #define HMA_PAGELOCK_SLOTS (256*1024) /* Maximum concurrent read/write transactions */ #define HMA_MAX_TRANSACTIONID 16 + +#define HMA_HASH_SIZE 512 + /* ** The argument to this macro is the value of a locking slot. It returns ** -1 if no client currently holds the write lock, or the transaction-id ** of the locker otherwise. */ -#define slotGetWriter(v) (((int)((v) >> HMA_MAX_TRANSACTIONID) & 0x1f) -1) +#define slotGetWriter(v) ((((int)(v)&HMA_SLOT_WL_MASK) >> HMA_SLOT_RL_BITS) - 1) -#define slotReaderMask(v) ((v) & ((1 << HMA_MAX_TRANSACTIONID)-1)) +/* +** The argument to this macro is the value of a locking slot. This macro +** returns the current number of slow reader clients reading the page. +*/ +#define slotGetSlowReaders(v) (((v) & HMA_SLOT_TR_MASK) >> HMA_SLOT_RLWL_BITS) + +#define slotReaderMask(v) ((v) & HMA_SLOT_RL_MASK) #include "unistd.h" #include "fcntl.h" #include "sys/mman.h" #include "sys/types.h" @@ -80,10 +100,17 @@ ServerDb *pNext; /* Next db in this process */ sqlite3_vfs *pVfs; ServerJournal aJrnl[HMA_MAX_TRANSACTIONID]; u8 *aJrnlFdSpace; + + int iNextCommit; /* Commit id for next pre-commit call */ + Server *pCommit; /* List of connections currently commiting */ + Server *pReader; /* Connections in slower-reader transaction */ + ServerPage *pPgFirst; /* First (oldest) in list of pages */ + ServerPage *pPgLast; /* Last (newest) in list of pages */ + ServerPage *apPg[HMA_HASH_SIZE]; }; /* ** Once instance for each client connection open on a server mode database ** in this process. @@ -90,13 +117,15 @@ */ struct Server { ServerDb *pDb; /* Database object */ Pager *pPager; /* Associated pager object */ int iTransId; /* Current transaction id (or -1) */ + int iCommitId; /* Current comit id (or 0) */ int nAlloc; /* Allocated size of aLock[] array */ int nLock; /* Number of entries in aLock[] */ u32 *aLock; /* Mapped lock file */ + Server *pNext; /* Next in pCommit or pReader list */ }; #define SERVER_WRITE_LOCK 3 #define SERVER_READ_LOCK 2 #define SERVER_NO_LOCK 1 @@ -140,10 +169,11 @@ sqlite3_free(p->aSlot); sqlite3_free(p); p = 0; }else{ p->nClient = 1; + p->iNextCommit = 1; p->aFileId[0] = aFileId[0]; p->aFileId[1] = aFileId[1]; p->pNext = g_server.pDb; g_server.pDb = p; } @@ -350,18 +380,133 @@ /* ** End a transaction (and release all locks). */ int sqlite3ServerEnd(Server *p){ int rc = SQLITE_OK; + Server **pp; ServerDb *pDb = p->pDb; + ServerPage *pFree = 0; + ServerPage *pPg = 0; sqlite3_mutex_enter(pDb->mutex); serverReleaseLocks(p); + + /* Clear the bit in the transaction mask. */ pDb->transmask &= ~((u32)1 << p->iTransId); + + /* If this connection is in the committers list, remove it. */ + for(pp=&pDb->pCommit; *pp; pp = &((*pp)->pNext)){ + if( *pp==p ){ + *pp = p->pNext; + break; + } + } + + /* See if it is possible to free any ServerPage records. If so, remove + ** them from the linked list and hash table, but do not call sqlite3_free() + ** on them until the mutex has been released. */ + if( pDb->pPgFirst ){ + Server *pIter; + int iOldest = 0x7FFFFFFF; + for(pIter=pDb->pReader; pIter; pIter=pIter->pNext){ + iOldest = MIN(iOldest, pIter->iCommitId); + } + for(pIter=pDb->pCommit; pIter; pIter=pIter->pNext){ + iOldest = MIN(iOldest, pIter->iCommitId); + } + + pFree = pDb->pPgFirst; + for(pPg=pDb->pPgFirst; pPg && pPg->iCommitIdpNext){ + if( pPg->pHashPrev ){ + pPg->pHashPrev->pHashNext = pPg->pHashNext; + }else{ + int iHash = pPg->pgno % HMA_HASH_SIZE; + assert( pDb->apPg[iHash]==pPg ); + pDb->apPg[iHash] = pPg->pHashNext; + } + if( pPg->pHashNext ){ + pPg->pHashNext->pHashPrev = pPg->pHashPrev; + } + } + if( pPg==0 ){ + pDb->pPgFirst = pDb->pPgLast = 0; + }else{ + pDb->pPgFirst = pPg; + } + } sqlite3_mutex_leave(pDb->mutex); + + /* Call sqlite3_free() on any pages that were unlinked from the hash + ** table above. */ + while( pFree && pFree!=pPg ){ + ServerPage *pNext = pFree->pNext; + sqlite3_free(pFree); + pFree = pNext; + } + p->iTransId = -1; + p->iCommitId = 0; + return rc; +} + +int sqlite3ServerPreCommit(Server *p, ServerPage *pPg){ + ServerDb *pDb = p->pDb; + int rc = SQLITE_OK; + ServerPage *pIter; + ServerPage *pNext; + + if( pPg==0 ) return SQLITE_OK; + + sqlite3_mutex_enter(pDb->mutex); + + /* Assign a commit id to this transaction */ + assert( p->iCommitId==0 ); + p->iCommitId = pDb->iNextCommit++; + + /* Iterate through all pages. For each: + ** + ** 1. Set the iCommitId field. + ** 2. Add the page to the hash table. + ** 3. Wait until all slow-reader locks have cleared. + */ + for(pIter=pPg; pIter; pIter=pIter->pNext){ + u32 *pSlot = &pDb->aSlot[pIter->pgno % HMA_PAGELOCK_SLOTS]; + int iHash = pIter->pgno % HMA_HASH_SIZE; + pIter->iCommitId = p->iCommitId; + pIter->pHashNext = pDb->apPg[iHash]; + if( pIter->pHashNext ){ + pIter->pHashNext->pHashPrev = pIter; + } + pDb->apPg[iHash] = pIter; + + /* TODO: Something better than this! */ + while( slotGetSlowReaders(*pSlot)>0 ){ + sqlite3_mutex_leave(pDb->mutex); + sqlite3_mutex_enter(pDb->mutex); + } + + /* If pIter is the last element in the list, append the new list to + ** the ServerDb.pPgFirst/pPgLast list at this point. */ + if( pIter->pNext==0 ){ + if( pDb->pPgLast ){ + assert( pDb->pPgFirst ); + pDb->pPgLast->pNext = pPg; + }else{ + assert( pDb->pPgFirst==0 ); + pDb->pPgFirst = pPg; + } + pDb->pPgLast = pIter; + } + } + + /* Add this connection to the list of current committers */ + assert( p->pNext==0 ); + p->pNext = pDb->pCommit; + pDb->pCommit = p; + + sqlite3_mutex_leave(pDb->mutex); return rc; } /* ** Release all write-locks. @@ -382,10 +527,11 @@ ServerDb *pDb = p->pDb; int iWriter; int bSkip = 0; u32 *pSlot; + assert( p->iTransId>=0 ); assert( p->nLock<=p->nAlloc ); if( p->nLock==p->nAlloc ){ int nNew = p->nLock ? p->nLock*2 : 256; u32 *aNew = sqlite3_realloc(p->aLock, nNew*sizeof(u32)); if( aNew==0 ) return SQLITE_NOMEM_BKPT; @@ -392,14 +538,18 @@ memset(&aNew[p->nLock], 0, sizeof(u32) * (nNew - p->nLock)); p->nAlloc = nNew; p->aLock = aNew; } - assert( p->iTransId>=0 ); - sqlite3_mutex_enter(pDb->mutex); + pSlot = &pDb->aSlot[pgno % HMA_PAGELOCK_SLOTS]; + assert( slotGetWriter(*pSlot)<0 + || slotReaderMask(*pSlot)==0 + || slotReaderMask(*pSlot)==(1 << slotGetWriter(*pSlot)) + ); + iWriter = slotGetWriter(*pSlot); if( iWriter==p->iTransId || (bWrite==0 && (*pSlot & (1<iTransId))) ){ bSkip = 1; }else if( iWriter>=0 ){ rc = SQLITE_BUSY_DEADLOCK; Index: src/server.h ================================================================== --- src/server.h +++ src/server.h @@ -17,20 +17,33 @@ #define SQLITE_SERVER_H typedef struct Server Server; -int sqlite3ServerConnect(Pager *pPager, Server **ppOut); +typedef struct ServerPage ServerPage; +struct ServerPage { + Pgno pgno; /* Page number for this record */ + int nData; /* Size of aData[] in bytes */ + u8 *aData; + ServerPage *pNext; + + int iCommitId; + ServerPage *pHashNext; + ServerPage *pHashPrev; +}; +int sqlite3ServerConnect(Pager *pPager, Server **ppOut); void sqlite3ServerDisconnect(Server *p, sqlite3_file *dbfd); int sqlite3ServerBegin(Server *p); +int sqlite3ServerPreCommit(Server*, ServerPage*); int sqlite3ServerEnd(Server *p); + int sqlite3ServerReleaseWriteLocks(Server *p); int sqlite3ServerLock(Server *p, Pgno pgno, int bWrite, int bBlock); int sqlite3ServerHasLock(Server *p, Pgno pgno, int bWrite); #endif /* SQLITE_SERVER_H */ - #endif /* SQLITE_SERVER_EDITION */ +