📄 test_async.c
字号:
async_vfs.mxPathname = ((sqlite3_vfs *)async_vfs.pAppData)->mxPathname; sqlite3_vfs_register(&async_vfs, 1); if( !hashTableInit ){ sqlite3HashInit(&async.aLock, SQLITE_HASH_BINARY, 1); hashTableInit = 1; } } }else{ if( async_vfs.pAppData ){ sqlite3_vfs_unregister(&async_vfs); async_vfs.pAppData = 0; } }}/* ** This procedure runs in a separate thread, reading messages off of the** write queue and processing them one by one. **** If async.writerHaltNow is true, then this procedure exits** after processing a single message.**** If async.writerHaltWhenIdle is true, then this procedure exits when** the write queue is empty.**** If both of the above variables are false, this procedure runs** indefinately, waiting for operations to be added to the write queue** and processing them in the order in which they arrive.**** An artifical delay of async.ioDelay milliseconds is inserted before** each write operation in order to simulate the effect of a slow disk.**** Only one instance of this procedure may be running at a time.*/static void *asyncWriterThread(void *pIsStarted){ sqlite3_vfs *pVfs = (sqlite3_vfs *)(async_vfs.pAppData); AsyncWrite *p = 0; int rc = SQLITE_OK; int holdingMutex = 0; if( pthread_mutex_trylock(&async.writerMutex) ){ return 0; } (*(int *)pIsStarted) = 1; while( async.writerHaltNow==0 ){ int doNotFree = 0; sqlite3_file *pBase = 0; if( !holdingMutex ){ pthread_mutex_lock(&async.queueMutex); } while( (p = async.pQueueFirst)==0 ){ pthread_cond_broadcast(&async.emptySignal); if( async.writerHaltWhenIdle ){ pthread_mutex_unlock(&async.queueMutex); break; }else{ ASYNC_TRACE(("IDLE\n")); pthread_cond_wait(&async.queueSignal, &async.queueMutex); ASYNC_TRACE(("WAKEUP\n")); } } if( p==0 ) break; holdingMutex = 1; /* Right now this thread is holding the mutex on the write-op queue. ** Variable 'p' points to the first entry in the write-op queue. In ** the general case, we hold on to the mutex for the entire body of ** the loop. ** ** However in the cases enumerated below, we relinquish the mutex, ** perform the IO, and then re-request the mutex before removing 'p' from ** the head of the write-op queue. The idea is to increase concurrency with ** sqlite threads. ** ** * An ASYNC_CLOSE operation. ** * An ASYNC_OPENEXCLUSIVE operation. For this one, we relinquish ** the mutex, call the underlying xOpenExclusive() function, then ** re-aquire the mutex before seting the AsyncFile.pBaseRead ** variable. ** * ASYNC_SYNC and ASYNC_WRITE operations, if ** SQLITE_ASYNC_TWO_FILEHANDLES was set at compile time and two ** file-handles are open for the particular file being "synced". */ if( async.ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){ p->op = ASYNC_NOOP; } if( p->pFileData ){ pBase = p->pFileData->pBaseWrite; if( p->op==ASYNC_CLOSE || p->op==ASYNC_OPENEXCLUSIVE || (pBase->pMethods && (p->op==ASYNC_SYNC || p->op==ASYNC_WRITE) ) ){ pthread_mutex_unlock(&async.queueMutex); holdingMutex = 0; } if( !pBase->pMethods ){ pBase = p->pFileData->pBaseRead; } } switch( p->op ){ case ASYNC_NOOP: break; case ASYNC_WRITE: assert( pBase ); ASYNC_TRACE(("WRITE %s %d bytes at %d\n", p->pFileData->zName, p->nByte, p->iOffset)); rc = sqlite3OsWrite(pBase, (void *)(p->zBuf), p->nByte, p->iOffset); break; case ASYNC_SYNC: assert( pBase ); ASYNC_TRACE(("SYNC %s\n", p->pFileData->zName)); rc = sqlite3OsSync(pBase, p->nByte); break; case ASYNC_TRUNCATE: assert( pBase ); ASYNC_TRACE(("TRUNCATE %s to %d bytes\n", p->pFileData->zName, p->iOffset)); rc = sqlite3OsTruncate(pBase, p->iOffset); break; case ASYNC_CLOSE: { AsyncFileData *pData = p->pFileData; ASYNC_TRACE(("CLOSE %s\n", p->pFileData->zName)); sqlite3OsClose(pData->pBaseWrite); sqlite3OsClose(pData->pBaseRead); /* Unlink AsyncFileData.lock from the linked list of AsyncFileLock ** structures for this file. Obtain the async.lockMutex mutex ** before doing so. */ pthread_mutex_lock(&async.lockMutex); rc = unlinkAsyncFile(pData); pthread_mutex_unlock(&async.lockMutex); async.pQueueFirst = p->pNext; sqlite3_free(pData); doNotFree = 1; break; } case ASYNC_UNLOCK: { AsyncLock *pLock; AsyncFileData *pData = p->pFileData; int eLock = p->nByte; pthread_mutex_lock(&async.lockMutex); pData->lock.eAsyncLock = MIN( pData->lock.eAsyncLock, MAX(pData->lock.eLock, eLock) ); assert(pData->lock.eAsyncLock>=pData->lock.eLock); pLock = sqlite3HashFind(&async.aLock, pData->zName, pData->nName); rc = getFileLock(pLock); pthread_mutex_unlock(&async.lockMutex); break; } case ASYNC_DELETE: ASYNC_TRACE(("DELETE %s\n", p->zBuf)); rc = sqlite3OsDelete(pVfs, p->zBuf, (int)p->iOffset); break; case ASYNC_OPENEXCLUSIVE: { int flags = (int)p->iOffset; AsyncFileData *pData = p->pFileData; ASYNC_TRACE(("OPEN %s flags=%d\n", p->zBuf, (int)p->iOffset)); assert(pData->pBaseRead->pMethods==0 && pData->pBaseWrite->pMethods==0); rc = sqlite3OsOpen(pVfs, pData->zName, pData->pBaseRead, flags, 0); assert( holdingMutex==0 ); pthread_mutex_lock(&async.queueMutex); holdingMutex = 1; break; } default: assert(!"Illegal value for AsyncWrite.op"); } /* If we didn't hang on to the mutex during the IO op, obtain it now ** so that the AsyncWrite structure can be safely removed from the ** global write-op queue. */ if( !holdingMutex ){ pthread_mutex_lock(&async.queueMutex); holdingMutex = 1; } /* ASYNC_TRACE(("UNLINK %p\n", p)); */ if( p==async.pQueueLast ){ async.pQueueLast = 0; } if( !doNotFree ){ async.pQueueFirst = p->pNext; sqlite3_free(p); } assert( holdingMutex ); /* An IO error has occured. We cannot report the error back to the ** connection that requested the I/O since the error happened ** asynchronously. The connection has already moved on. There ** really is nobody to report the error to. ** ** The file for which the error occured may have been a database or ** journal file. Regardless, none of the currently queued operations ** associated with the same database should now be performed. Nor should ** any subsequently requested IO on either a database or journal file ** handle for the same database be accepted until the main database ** file handle has been closed and reopened. ** ** Furthermore, no further IO should be queued or performed on any file ** handle associated with a database that may have been part of a ** multi-file transaction that included the database associated with ** the IO error (i.e. a database ATTACHed to the same handle at some ** point in time). */ if( rc!=SQLITE_OK ){ async.ioError = rc; } if( async.ioError && !async.pQueueFirst ){ pthread_mutex_lock(&async.lockMutex); if( 0==sqliteHashFirst(&async.aLock) ){ async.ioError = SQLITE_OK; } pthread_mutex_unlock(&async.lockMutex); } /* Drop the queue mutex before continuing to the next write operation ** in order to give other threads a chance to work with the write queue. */ if( !async.pQueueFirst || !async.ioError ){ pthread_mutex_unlock(&async.queueMutex); holdingMutex = 0; if( async.ioDelay>0 ){ sqlite3OsSleep(pVfs, async.ioDelay); }else{ sched_yield(); } } } pthread_mutex_unlock(&async.writerMutex); return 0;}/**************************************************************************** The remaining code defines a Tcl interface for testing the asynchronous** IO implementation in this file.**** To adapt the code to a non-TCL environment, delete or comment out** the code that follows.*//*** sqlite3async_enable ?YES/NO?**** Enable or disable the asynchronous I/O backend. This command is** not thread-safe. Do not call it while any database connections** are open.*/static int testAsyncEnable( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[]){ if( objc!=1 && objc!=2 ){ Tcl_WrongNumArgs(interp, 1, objv, "?YES/NO?"); return TCL_ERROR; } if( objc==1 ){ Tcl_SetObjResult(interp, Tcl_NewBooleanObj(async_vfs.pAppData!=0)); }else{ int en; if( Tcl_GetBooleanFromObj(interp, objv[1], &en) ) return TCL_ERROR; asyncEnable(en); } return TCL_OK;}/*** sqlite3async_halt "now"|"idle"|"never"**** Set the conditions at which the writer thread will halt.*/static int testAsyncHalt( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[]){ const char *zCond; if( objc!=2 ){ Tcl_WrongNumArgs(interp, 1, objv, "\"now\"|\"idle\"|\"never\""); return TCL_ERROR; } zCond = Tcl_GetString(objv[1]); if( strcmp(zCond, "now")==0 ){ async.writerHaltNow = 1; pthread_cond_broadcast(&async.queueSignal); }else if( strcmp(zCond, "idle")==0 ){ async.writerHaltWhenIdle = 1; async.writerHaltNow = 0; pthread_cond_broadcast(&async.queueSignal); }else if( strcmp(zCond, "never")==0 ){ async.writerHaltWhenIdle = 0; async.writerHaltNow = 0; }else{ Tcl_AppendResult(interp, "should be one of: \"now\", \"idle\", or \"never\"", (char*)0); return TCL_ERROR; } return TCL_OK;}/*** sqlite3async_delay ?MS?**** Query or set the number of milliseconds of delay in the writer** thread after each write operation. The default is 0. By increasing** the memory delay we can simulate the effect of slow disk I/O.*/static int testAsyncDelay( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[]){ if( objc!=1 && objc!=2 ){ Tcl_WrongNumArgs(interp, 1, objv, "?MS?"); return TCL_ERROR; } if( objc==1 ){ Tcl_SetObjResult(interp, Tcl_NewIntObj(async.ioDelay)); }else{ int ioDelay; if( Tcl_GetIntFromObj(interp, objv[1], &ioDelay) ) return TCL_ERROR; async.ioDelay = ioDelay; } return TCL_OK;}/*** sqlite3async_start**** Start a new writer thread.*/static int testAsyncStart( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[]){ pthread_t x; int rc; volatile int isStarted = 0; rc = pthread_create(&x, 0, asyncWriterThread, (void *)&isStarted); if( rc ){ Tcl_AppendResult(interp, "failed to create the thread", 0); return TCL_ERROR; } pthread_detach(x); while( isStarted==0 ){ sched_yield(); } return TCL_OK;}/*** sqlite3async_wait**** Wait for the current writer thread to terminate.**** If the current writer thread is set to run forever then this** command would block forever. To prevent that, an error is returned. */static int testAsyncWait( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[]){ int cnt = 10; if( async.writerHaltNow==0 && async.writerHaltWhenIdle==0 ){ Tcl_AppendResult(interp, "would block forever", (char*)0); return TCL_ERROR; } while( cnt-- && !pthread_mutex_trylock(&async.writerMutex) ){ pthread_mutex_unlock(&async.writerMutex); sched_yield(); } if( cnt>=0 ){ ASYNC_TRACE(("WAIT\n")); pthread_mutex_lock(&async.queueMutex); pthread_cond_broadcast(&async.queueSignal); pthread_mutex_unlock(&async.queueMutex); pthread_mutex_lock(&async.writerMutex); pthread_mutex_unlock(&async.writerMutex); }else{ ASYNC_TRACE(("NO-WAIT\n")); } return TCL_OK;}#endif /* SQLITE_OS_UNIX and SQLITE_THREADSAFE *//*** This routine registers the custom TCL commands defined in this** module. This should be the only procedure visible from outside** of this module.*/int Sqlitetestasync_Init(Tcl_Interp *interp){#if SQLITE_OS_UNIX && SQLITE_THREADSAFE Tcl_CreateObjCommand(interp,"sqlite3async_enable",testAsyncEnable,0,0); Tcl_CreateObjCommand(interp,"sqlite3async_halt",testAsyncHalt,0,0); Tcl_CreateObjCommand(interp,"sqlite3async_delay",testAsyncDelay,0,0); Tcl_CreateObjCommand(interp,"sqlite3async_start",testAsyncStart,0,0); Tcl_CreateObjCommand(interp,"sqlite3async_wait",testAsyncWait,0,0); Tcl_LinkVar(interp, "sqlite3async_trace", (char*)&sqlite3async_trace, TCL_LINK_INT);#endif /* SQLITE_OS_UNIX and SQLITE_THREADSAFE */ return TCL_OK;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -