/* ** 2017 June 7 ** ** The author disclaims copyright to this source code. In place of ** a legal notice, here is a blessing: ** ** May you do good and not evil. ** May you find forgiveness for yourself and forgive others. ** May you share freely, never taking more than you give. ** ************************************************************************* ** ** Simple multi-threaded server used for informal testing of concurrency ** between connections in different threads. Listens for tcp/ip connections ** on port 9999 of the 127.0.0.1 interface only. To build: ** ** gcc -g $(TOP)/tool/tserver.c sqlite3.o -lpthread -o tserver ** ** To run using "x.db" as the db file: ** ** ./tserver x.db ** ** To connect, open a client socket on port 9999 and start sending commands. ** Commands are either SQL - which must be terminated by a semi-colon, or ** dot-commands, which must be terminated by a newline. If an SQL statement ** is seen, it is prepared and added to an internal list. ** ** Dot-commands are: ** ** .list Display all SQL statements in the list. ** .quit Disconnect. ** .run Run all SQL statements in the list. ** .repeats N Configure the number of repeats per ".run". ** .seconds N Configure the number of seconds to ".run" for. ** .mutex_commit Add a "COMMIT" protected by a g.commit_mutex ** to the current SQL. ** .stop Stop the tserver process - exit(0). ** ** Example input: ** ** BEGIN; ** INSERT INTO t1 VALUES(randomblob(10), randomblob(100)); ** INSERT INTO t1 VALUES(randomblob(10), randomblob(100)); ** INSERT INTO t1 VALUES(randomblob(10), randomblob(100)); ** COMMIT; ** .repeats 100000 ** .run ** */ #define TSERVER_PORTNUMBER 9999 #include #include #include #include #include #include #include #include #include #include #include #include "sqlite3.h" #define TSERVER_DEFAULT_CHECKPOINT_THRESHOLD 3900 /* Global variables */ struct TserverGlobal { char *zDatabaseName; /* Database used by this server */ char *zVfs; sqlite3_mutex *commit_mutex; sqlite3 *db; /* Global db handle */ /* The following use native pthreads instead of a portable interface. This ** is because a condition variable, as well as a mutex, is required. */ pthread_mutex_t ckpt_mutex; pthread_cond_t ckpt_cond; int nThreshold; /* Checkpoint when wal is this large */ int bCkptRequired; /* True if wal checkpoint is required */ int nRun; /* Number of clients in ".run" */ int nWait; /* Number of clients waiting on ckpt_cond */ }; static struct TserverGlobal g = {0}; typedef struct ClientSql ClientSql; struct ClientSql { sqlite3_stmt *pStmt; int bMutex; }; typedef struct ClientCtx ClientCtx; struct ClientCtx { sqlite3 *db; /* Database handle for this client */ int fd; /* Client fd */ int nRepeat; /* Number of times to repeat SQL */ int nSecond; /* Number of seconds to run for */ ClientSql *aPrepare; /* Array of prepared statements */ int nPrepare; /* Valid size of apPrepare[] */ int nAlloc; /* Allocated size of apPrepare[] */ int nClientThreshold; /* Threshold for checkpointing */ int bClientCkptRequired; /* True to do a checkpoint */ }; static int is_eol(int i){ return (i=='\n' || i=='\r'); } static int is_whitespace(int i){ return (i==' ' || i=='\t' || is_eol(i)); } /* ** Implementation of SQL scalar function usleep(). */ static void usleepFunc( sqlite3_context *context, int argc, sqlite3_value **argv ){ int nUs; sqlite3_vfs *pVfs = (sqlite3_vfs*)sqlite3_user_data(context); assert( argc==1 ); nUs = sqlite3_value_int64(argv[0]); pVfs->xSleep(pVfs, nUs); } static void trim_string(const char **pzStr, int *pnStr){ const char *zStr = *pzStr; int nStr = *pnStr; while( nStr>0 && is_whitespace(zStr[0]) ){ zStr++; nStr--; } while( nStr>0 && is_whitespace(zStr[nStr-1]) ){ nStr--; } *pzStr = zStr; *pnStr = nStr; } static int send_message(ClientCtx *p, const char *zFmt, ...){ char *zMsg; va_list ap; /* Vararg list */ va_start(ap, zFmt); int res = -1; zMsg = sqlite3_vmprintf(zFmt, ap); if( zMsg ){ res = write(p->fd, zMsg, strlen(zMsg)); } sqlite3_free(zMsg); va_end(ap); return (res<0); } static int handle_some_sql(ClientCtx *p, const char *zSql, int nSql){ const char *zTail = zSql; int nTail = nSql; int rc = SQLITE_OK; while( rc==SQLITE_OK ){ if( p->nPrepare>=p->nAlloc ){ int nByte = (p->nPrepare+32) * sizeof(ClientSql); ClientSql *aNew = sqlite3_realloc(p->aPrepare, nByte); if( aNew ){ memset(&aNew[p->nPrepare], 0, sizeof(ClientSql)*32); p->aPrepare = aNew; p->nAlloc = p->nPrepare+32; }else{ rc = SQLITE_NOMEM; break; } } rc = sqlite3_prepare_v2( p->db, zTail, nTail, &p->aPrepare[p->nPrepare].pStmt, &zTail ); if( rc!=SQLITE_OK ){ send_message(p, "error - %s (eec=%d)\n", sqlite3_errmsg(p->db), sqlite3_extended_errcode(p->db) ); rc = 1; break; } if( p->aPrepare[p->nPrepare].pStmt==0 ){ break; } p->nPrepare++; nTail = nSql - (zTail-zSql); rc = send_message(p, "ok (%d SQL statements)\n", p->nPrepare); } return rc; } static sqlite3_int64 get_timer(void){ struct timeval t; gettimeofday(&t, 0); return ((sqlite3_int64)t.tv_usec / 1000) + ((sqlite3_int64)t.tv_sec * 1000); } static void clear_sql(ClientCtx *p){ int j; for(j=0; jnPrepare; j++){ sqlite3_finalize(p->aPrepare[j].pStmt); } p->nPrepare = 0; } /* ** The sqlite3_wal_hook() callback used by all client database connections. */ static int clientWalHook(void *pArg, sqlite3 *db, const char *zDb, int nFrame){ if( g.nThreshold>0 ){ if( nFrame>=g.nThreshold ){ g.bCkptRequired = 1; } }else{ ClientCtx *pCtx = (ClientCtx*)pArg; if( pCtx->nClientThreshold && nFrame>=pCtx->nClientThreshold ){ pCtx->bClientCkptRequired = 1; } } return SQLITE_OK; } static int handle_run_command(ClientCtx *p){ int i, j; int nBusy = 0; sqlite3_int64 t0 = get_timer(); sqlite3_int64 t1 = t0; int nT1 = 0; int nTBusy1 = 0; int rc = SQLITE_OK; pthread_mutex_lock(&g.ckpt_mutex); g.nRun++; pthread_mutex_unlock(&g.ckpt_mutex); for(j=0; (p->nRepeat<=0 || jnRepeat) && rc==SQLITE_OK; j++){ sqlite3_int64 t2; for(i=0; inPrepare && rc==SQLITE_OK; i++){ sqlite3_stmt *pStmt = p->aPrepare[i].pStmt; /* If the bMutex flag is set, grab g.commit_mutex before executing ** the SQL statement (which is always "COMMIT" in this case). */ if( p->aPrepare[i].bMutex ){ sqlite3_mutex_enter(g.commit_mutex); } /* Execute the statement */ while( sqlite3_step(pStmt)==SQLITE_ROW ); rc = sqlite3_reset(pStmt); /* Relinquish the g.commit_mutex mutex if required. */ if( p->aPrepare[i].bMutex ){ sqlite3_mutex_leave(g.commit_mutex); } if( (rc & 0xFF)==SQLITE_BUSY ){ if( sqlite3_get_autocommit(p->db)==0 ){ sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0); } nBusy++; rc = SQLITE_OK; break; } else if( rc!=SQLITE_OK ){ send_message(p, "error - %s (eec=%d)\n", sqlite3_errmsg(p->db), sqlite3_extended_errcode(p->db) ); } } t2 = get_timer(); if( t2>=(t1+1000) ){ int nMs = (t2 - t1); int nDone = (j+1 - nBusy - nT1); rc = send_message( p, "(%d done @ %d per second, %d busy)\n", nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1 ); t1 = t2; nT1 = j+1 - nBusy; nTBusy1 = nBusy; if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break; } /* Global checkpoint handling. */ if( g.nThreshold>0 ){ pthread_mutex_lock(&g.ckpt_mutex); if( rc==SQLITE_OK && g.bCkptRequired ){ if( g.nWait==g.nRun-1 ){ /* All other clients are already waiting on the condition variable. ** Run the checkpoint, signal the condition and move on. */ rc = sqlite3_wal_checkpoint(p->db, "main"); g.bCkptRequired = 0; pthread_cond_broadcast(&g.ckpt_cond); }else{ assert( g.nWaitbClientCkptRequired ){ rc = sqlite3_wal_checkpoint(p->db, "main"); assert( rc==SQLITE_OK ); p->bClientCkptRequired = 0; } } if( rc==SQLITE_OK ){ int nMs = (int)(get_timer() - t0); send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j); if( p->nRepeat<=0 ){ send_message(p, "### ok %d busy %d ms %d\n", j-nBusy, nBusy, nMs); } } clear_sql(p); pthread_mutex_lock(&g.ckpt_mutex); g.nRun--; pthread_mutex_unlock(&g.ckpt_mutex); return rc; } static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){ int n; int rc = 0; const char *z = &zCmd[1]; const char *zArg; int nArg; assert( zCmd[0]=='.' ); for(n=0; n<(nCmd-1); n++){ if( is_whitespace(z[n]) ) break; } zArg = &z[n]; nArg = nCmd-n; trim_string(&zArg, &nArg); if( n>=1 && n<=4 && 0==strncmp(z, "list", n) ){ int i; for(i=0; rc==0 && inPrepare; i++){ const char *zSql = sqlite3_sql(p->aPrepare[i].pStmt); int nSql = strlen(zSql); trim_string(&zSql, &nSql); rc = send_message(p, "%d: %.*s\n", i, nSql, zSql); } } else if( n>=1 && n<=4 && 0==strncmp(z, "quit", n) ){ rc = 1; } else if( n>=2 && n<=7 && 0==strncmp(z, "repeats", n) ){ if( nArg ){ p->nRepeat = strtol(zArg, 0, 0); if( p->nRepeat>0 ) p->nSecond = 0; } rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat); } else if( n>=2 && n<=3 && 0==strncmp(z, "run", n) ){ rc = handle_run_command(p); } else if( n>=2 && n<=7 && 0==strncmp(z, "seconds", n) ){ if( nArg ){ p->nSecond = strtol(zArg, 0, 0); if( p->nSecond>0 ) p->nRepeat = 0; } rc = send_message(p, "ok (seconds=%d)\n", p->nSecond); } else if( n>=1 && n<=12 && 0==strncmp(z, "mutex_commit", n) ){ rc = handle_some_sql(p, "COMMIT;", 7); if( rc==SQLITE_OK ){ p->aPrepare[p->nPrepare-1].bMutex = 1; } } else if( n>=1 && n<=10 && 0==strncmp(z, "checkpoint", n) ){ if( nArg ){ p->nClientThreshold = strtol(zArg, 0, 0); } rc = send_message(p, "ok (checkpoint=%d)\n", p->nClientThreshold); } else if( n>=2 && n<=4 && 0==strncmp(z, "stop", n) ){ sqlite3_close(g.db); exit(0); } else{ send_message(p, "unrecognized dot command: %.*s\n" "should be \"list\", \"run\", \"repeats\", \"mutex_commit\", " "\"checkpoint\" or \"seconds\"\n", n, z ); rc = 1; } return rc; } static void *handle_client(void *pArg){ char zCmd[32*1024]; /* Read buffer */ int nCmd = 0; /* Valid bytes in zCmd[] */ int res; /* Result of read() call */ int rc = SQLITE_OK; ClientCtx ctx; memset(&ctx, 0, sizeof(ClientCtx)); ctx.fd = (int)(intptr_t)pArg; ctx.nRepeat = 1; rc = sqlite3_open_v2(g.zDatabaseName, &ctx.db, SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs ); if( rc!=SQLITE_OK ){ fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(ctx.db)); return 0; } sqlite3_create_function( ctx.db, "usleep", 1, SQLITE_UTF8, (void*)sqlite3_vfs_find(0), usleepFunc, 0, 0 ); /* Register the wal-hook with the new client connection */ sqlite3_wal_hook(ctx.db, clientWalHook, (void*)&ctx); while( rc==SQLITE_OK ){ int i; int iStart; int nConsume; res = read(ctx.fd, &zCmd[nCmd], sizeof(zCmd)-nCmd-1); if( res<=0 ) break; nCmd += res; if( nCmd>=sizeof(zCmd)-1 ){ fprintf(stderr, "oversized (>32KiB) message\n"); res = 0; break; } zCmd[nCmd] = '\0'; do { nConsume = 0; /* Gobble up any whitespace */ iStart = 0; while( is_whitespace(zCmd[iStart]) ) iStart++; if( zCmd[iStart]=='.' ){ /* This is a dot-command. Search for end-of-line. */ for(i=iStart; i0 ){ nCmd = nCmd-nConsume; if( nCmd>0 ){ memmove(zCmd, &zCmd[nConsume], nCmd); } } }while( rc==SQLITE_OK && nConsume>0 ); } fprintf(stdout, "Client %d disconnects\n", ctx.fd); fflush(stdout); close(ctx.fd); clear_sql(&ctx); sqlite3_free(ctx.aPrepare); sqlite3_close(ctx.db); return 0; } static void usage(const char *zExec){ fprintf(stderr, "Usage: %s ?-vfs VFS? DATABASE\n", zExec); exit(1); } int main(int argc, char *argv[]) { int sfd; int rc; int yes = 1; struct sockaddr_in server; int i; /* Ignore SIGPIPE. Otherwise the server exits if a client disconnects ** abruptly. */ signal(SIGPIPE, SIG_IGN); g.nThreshold = TSERVER_DEFAULT_CHECKPOINT_THRESHOLD; if( (argc%2) ) usage(argv[0]); for(i=1; i<(argc-1); i+=2){ int n = strlen(argv[i]); if( n>=2 && 0==sqlite3_strnicmp("-walautocheckpoint", argv[i], n) ){ g.nThreshold = strtol(argv[i+1], 0, 0); }else if( n>=2 && 0==sqlite3_strnicmp("-vfs", argv[i], n) ){ g.zVfs = argv[i+1]; } } g.zDatabaseName = argv[argc-1]; g.commit_mutex = sqlite3_mutex_alloc(SQLITE_MUTEX_FAST); pthread_mutex_init(&g.ckpt_mutex, 0); pthread_cond_init(&g.ckpt_cond, 0); rc = sqlite3_open_v2(g.zDatabaseName, &g.db, SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs ); if( rc!=SQLITE_OK ){ fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(g.db)); return 1; } rc = sqlite3_exec(g.db, "SELECT * FROM sqlite_master", 0, 0, 0); if( rc!=SQLITE_OK ){ fprintf(stderr, "sqlite3_exec(): %s\n", sqlite3_errmsg(g.db)); return 1; } sfd = socket(AF_INET, SOCK_STREAM, 0); if( sfd<0 ){ fprintf(stderr, "socket() failed\n"); return 1; } rc = setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); if( rc<0 ){ perror("setsockopt"); return 1; } memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_addr.s_addr = inet_addr("127.0.0.1"); server.sin_port = htons(TSERVER_PORTNUMBER); rc = bind(sfd, (struct sockaddr *)&server, sizeof(struct sockaddr)); if( rc<0 ){ fprintf(stderr, "bind() failed\n"); return 1; } rc = listen(sfd, 8); if( rc<0 ){ fprintf(stderr, "listen() failed\n"); return 1; } while( 1 ){ pthread_t tid; int cfd = accept(sfd, NULL, NULL); if( cfd<0 ){ perror("accept()"); return 1; } fprintf(stdout, "Client %d connects\n", cfd); fflush(stdout); rc = pthread_create(&tid, NULL, handle_client, (void*)(intptr_t)cfd); if( rc!=0 ){ perror("pthread_create()"); return 1; } pthread_detach(tid); } return 0; }