📄 test_async.c
字号:
** async.queueMutex** async.writerMutex** async.lockMutex**** If NDEBUG is defined, these wrappers do nothing except call the ** corresponding pthreads function. If NDEBUG is not defined, then the** following variables are used to store the thread-id (as returned** by pthread_self()) currently holding the mutex, or 0 otherwise:**** asyncdebug.queueMutexHolder** asyncdebug.writerMutexHolder** asyncdebug.lockMutexHolder**** These variables are used by some assert() statements that verify** the statements made in the "Deadlock Prevention" notes earlier** in this file.*/#ifndef NDEBUGstatic struct TestAsyncDebugData { pthread_t lockMutexHolder; pthread_t queueMutexHolder; pthread_t writerMutexHolder;} asyncdebug = {0, 0, 0};/*** Wrapper around pthread_mutex_lock(). Checks that we have not violated** the anti-deadlock rules (see "Deadlock prevention" above).*/static int async_mutex_lock(pthread_mutex_t *pMutex){ int iIdx; int rc; pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); pthread_t *aHolder = (pthread_t *)(&asyncdebug); /* The code in this 'ifndef NDEBUG' block depends on a certain alignment * of the variables in TestAsyncStaticData and TestAsyncDebugData. The * following assert() statements check that this has not been changed. * * Really, these only need to be run once at startup time. */ assert(&(aMutex[0])==&async.lockMutex); assert(&(aMutex[1])==&async.queueMutex); assert(&(aMutex[2])==&async.writerMutex); assert(&(aHolder[0])==&asyncdebug.lockMutexHolder); assert(&(aHolder[1])==&asyncdebug.queueMutexHolder); assert(&(aHolder[2])==&asyncdebug.writerMutexHolder); assert( pthread_self()!=0 ); for(iIdx=0; iIdx<3; iIdx++){ if( pMutex==&aMutex[iIdx] ) break; /* This is the key assert(). Here we are checking that if the caller * is trying to block on async.writerMutex, neither of the other two * mutex are held. If the caller is trying to block on async.queueMutex, * lockMutex is not held. */ assert(!pthread_equal(aHolder[iIdx], pthread_self())); } assert(iIdx<3); rc = pthread_mutex_lock(pMutex); if( rc==0 ){ assert(aHolder[iIdx]==0); aHolder[iIdx] = pthread_self(); } return rc;}/*** Wrapper around pthread_mutex_unlock().*/static int async_mutex_unlock(pthread_mutex_t *pMutex){ int iIdx; int rc; pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); pthread_t *aHolder = (pthread_t *)(&asyncdebug); for(iIdx=0; iIdx<3; iIdx++){ if( pMutex==&aMutex[iIdx] ) break; } assert(iIdx<3); assert(pthread_equal(aHolder[iIdx], pthread_self())); aHolder[iIdx] = 0; rc = pthread_mutex_unlock(pMutex); assert(rc==0); return 0;}/*** Wrapper around pthread_mutex_trylock().*/static int async_mutex_trylock(pthread_mutex_t *pMutex){ int iIdx; int rc; pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); pthread_t *aHolder = (pthread_t *)(&asyncdebug); for(iIdx=0; iIdx<3; iIdx++){ if( pMutex==&aMutex[iIdx] ) break; } assert(iIdx<3); rc = pthread_mutex_trylock(pMutex); if( rc==0 ){ assert(aHolder[iIdx]==0); aHolder[iIdx] = pthread_self(); } return rc;}/*** Wrapper around pthread_cond_wait().*/static int async_cond_wait(pthread_cond_t *pCond, pthread_mutex_t *pMutex){ int iIdx; int rc; pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); pthread_t *aHolder = (pthread_t *)(&asyncdebug); for(iIdx=0; iIdx<3; iIdx++){ if( pMutex==&aMutex[iIdx] ) break; } assert(iIdx<3); assert(pthread_equal(aHolder[iIdx],pthread_self())); aHolder[iIdx] = 0; rc = pthread_cond_wait(pCond, pMutex); if( rc==0 ){ aHolder[iIdx] = pthread_self(); } return rc;}/* Call our async_XX wrappers instead of selected pthread_XX functions */#define pthread_mutex_lock async_mutex_lock#define pthread_mutex_unlock async_mutex_unlock#define pthread_mutex_trylock async_mutex_trylock#define pthread_cond_wait async_cond_wait#endif /* !defined(NDEBUG) *//*** Add an entry to the end of the global write-op list. pWrite should point ** to an AsyncWrite structure allocated using sqlite3_malloc(). The writer** thread will call sqlite3_free() to free the structure after the specified** operation has been completed.**** Once an AsyncWrite structure has been added to the list, it becomes the** property of the writer thread and must not be read or modified by the** caller. */static void addAsyncWrite(AsyncWrite *pWrite){ /* We must hold the queue mutex in order to modify the queue pointers */ pthread_mutex_lock(&async.queueMutex); /* Add the record to the end of the write-op queue */ assert( !pWrite->pNext ); if( async.pQueueLast ){ assert( async.pQueueFirst ); async.pQueueLast->pNext = pWrite; }else{ async.pQueueFirst = pWrite; } async.pQueueLast = pWrite; ASYNC_TRACE(("PUSH %p (%s %s %d)\n", pWrite, azOpcodeName[pWrite->op], pWrite->pFileData ? pWrite->pFileData->zName : "-", pWrite->iOffset)); if( pWrite->op==ASYNC_CLOSE ){ async.nFile--; } /* Drop the queue mutex */ pthread_mutex_unlock(&async.queueMutex); /* The writer thread might have been idle because there was nothing ** on the write-op queue for it to do. So wake it up. */ pthread_cond_signal(&async.queueSignal);}/*** Increment async.nFile in a thread-safe manner.*/static void incrOpenFileCount(){ /* We must hold the queue mutex in order to modify async.nFile */ pthread_mutex_lock(&async.queueMutex); if( async.nFile==0 ){ async.ioError = SQLITE_OK; } async.nFile++; pthread_mutex_unlock(&async.queueMutex);}/*** This is a utility function to allocate and populate a new AsyncWrite** structure and insert it (via addAsyncWrite() ) into the global list.*/static int addNewAsyncWrite( AsyncFileData *pFileData, int op, i64 iOffset, int nByte, const char *zByte){ AsyncWrite *p; if( op!=ASYNC_CLOSE && async.ioError ){ return async.ioError; } p = sqlite3_malloc(sizeof(AsyncWrite) + (zByte?nByte:0)); if( !p ){ /* The upper layer does not expect operations like OsWrite() to ** return SQLITE_NOMEM. This is partly because under normal conditions ** SQLite is required to do rollback without calling malloc(). So ** if malloc() fails here, treat it as an I/O error. The above ** layer knows how to handle that. */ return SQLITE_IOERR; } p->op = op; p->iOffset = iOffset; p->nByte = nByte; p->pFileData = pFileData; p->pNext = 0; if( zByte ){ p->zBuf = (char *)&p[1]; memcpy(p->zBuf, zByte, nByte); }else{ p->zBuf = 0; } addAsyncWrite(p); return SQLITE_OK;}/*** Close the file. This just adds an entry to the write-op list, the file is** not actually closed.*/static int asyncClose(sqlite3_file *pFile){ AsyncFileData *p = ((AsyncFile *)pFile)->pData; /* Unlock the file, if it is locked */ pthread_mutex_lock(&async.lockMutex); p->lock.eLock = 0; pthread_mutex_unlock(&async.lockMutex); addAsyncWrite(&p->close); return SQLITE_OK;}/*** Implementation of sqlite3OsWrite() for asynchronous files. Instead of ** writing to the underlying file, this function adds an entry to the end of** the global AsyncWrite list. Either SQLITE_OK or SQLITE_NOMEM may be** returned.*/static int asyncWrite(sqlite3_file *pFile, const void *pBuf, int amt, i64 iOff){ AsyncFileData *p = ((AsyncFile *)pFile)->pData; return addNewAsyncWrite(p, ASYNC_WRITE, iOff, amt, pBuf);}/*** Read data from the file. First we read from the filesystem, then adjust ** the contents of the buffer based on ASYNC_WRITE operations in the ** write-op queue.**** This method holds the mutex from start to finish.*/static int asyncRead(sqlite3_file *pFile, void *zOut, int iAmt, i64 iOffset){ AsyncFileData *p = ((AsyncFile *)pFile)->pData; int rc = SQLITE_OK; i64 filesize; int nRead; sqlite3_file *pBase = p->pBaseRead; /* Grab the write queue mutex for the duration of the call */ pthread_mutex_lock(&async.queueMutex); /* If an I/O error has previously occurred in this virtual file ** system, then all subsequent operations fail. */ if( async.ioError!=SQLITE_OK ){ rc = async.ioError; goto asyncread_out; } if( pBase->pMethods ){ rc = sqlite3OsFileSize(pBase, &filesize); if( rc!=SQLITE_OK ){ goto asyncread_out; } nRead = MIN(filesize - iOffset, iAmt); if( nRead>0 ){ rc = sqlite3OsRead(pBase, zOut, nRead, iOffset); ASYNC_TRACE(("READ %s %d bytes at %d\n", p->zName, nRead, iOffset)); } } if( rc==SQLITE_OK ){ AsyncWrite *pWrite; char *zName = p->zName; for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){ if( pWrite->op==ASYNC_WRITE && pWrite->pFileData->zName==zName ){ int iBeginOut = (pWrite->iOffset-iOffset); int iBeginIn = -iBeginOut; int nCopy; if( iBeginIn<0 ) iBeginIn = 0; if( iBeginOut<0 ) iBeginOut = 0; nCopy = MIN(pWrite->nByte-iBeginIn, iAmt-iBeginOut); if( nCopy>0 ){ memcpy(&((char *)zOut)[iBeginOut], &pWrite->zBuf[iBeginIn], nCopy); ASYNC_TRACE(("OVERREAD %d bytes at %d\n", nCopy, iBeginOut+iOffset)); } } } }asyncread_out: pthread_mutex_unlock(&async.queueMutex); return rc;}/*** Truncate the file to nByte bytes in length. This just adds an entry to ** the write-op list, no IO actually takes place.*/static int asyncTruncate(sqlite3_file *pFile, i64 nByte){ AsyncFileData *p = ((AsyncFile *)pFile)->pData; return addNewAsyncWrite(p, ASYNC_TRUNCATE, nByte, 0, 0);}/*** Sync the file. This just adds an entry to the write-op list, the ** sync() is done later by sqlite3_async_flush().*/static int asyncSync(sqlite3_file *pFile, int flags){ AsyncFileData *p = ((AsyncFile *)pFile)->pData; return addNewAsyncWrite(p, ASYNC_SYNC, 0, flags, 0);}/*** Read the size of the file. First we read the size of the file system ** entry, then adjust for any ASYNC_WRITE or ASYNC_TRUNCATE operations ** currently in the write-op list. **** This method holds the mutex from start to finish.*/int asyncFileSize(sqlite3_file *pFile, i64 *piSize){ AsyncFileData *p = ((AsyncFile *)pFile)->pData; int rc = SQLITE_OK; i64 s = 0; sqlite3_file *pBase; pthread_mutex_lock(&async.queueMutex); /* Read the filesystem size from the base file. If pBaseRead is NULL, this ** means the file hasn't been opened yet. In this case all relevant data ** must be in the write-op queue anyway, so we can omit reading from the ** file-system. */ pBase = p->pBaseRead; if( pBase->pMethods ){ rc = sqlite3OsFileSize(pBase, &s); } if( rc==SQLITE_OK ){ AsyncWrite *pWrite; for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){ if( pWrite->op==ASYNC_DELETE && strcmp(p->zName, pWrite->zBuf)==0 ){ s = 0; }else if( pWrite->pFileData && pWrite->pFileData->zName==p->zName){ switch( pWrite->op ){ case ASYNC_WRITE: s = MAX(pWrite->iOffset + (i64)(pWrite->nByte), s); break; case ASYNC_TRUNCATE: s = MIN(s, pWrite->iOffset); break; } } } *piSize = s; } pthread_mutex_unlock(&async.queueMutex); return rc;}/*** Lock or unlock the actual file-system entry.*/static int getFileLock(AsyncLock *pLock){ int rc = SQLITE_OK; AsyncFileLock *pIter; int eRequired = 0; if( pLock->pFile ){ for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ assert(pIter->eAsyncLock>=pIter->eLock); if( pIter->eAsyncLock>eRequired ){ eRequired = pIter->eAsyncLock; assert(eRequired>=0 && eRequired<=SQLITE_LOCK_EXCLUSIVE); } } if( eRequired>pLock->eLock ){ rc = sqlite3OsLock(pLock->pFile, eRequired); if( rc==SQLITE_OK ){ pLock->eLock = eRequired; } } else if( eRequired<pLock->eLock && eRequired<=SQLITE_LOCK_SHARED ){ rc = sqlite3OsUnlock(pLock->pFile, eRequired); if( rc==SQLITE_OK ){ pLock->eLock = eRequired; } } } return rc;}/*** The following two methods - asyncLock() and asyncUnlock() - are used** to obtain and release locks on database files opened with the** asynchronous backend.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -