⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test_async.c

📁 最新的sqlite3.6.2源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
**     async.queueMutex**     async.writerMutex**     async.lockMutex**** If NDEBUG is defined, these wrappers do nothing except call the ** corresponding pthreads function. If NDEBUG is not defined, then the** following variables are used to store the thread-id (as returned** by pthread_self()) currently holding the mutex, or 0 otherwise:****     asyncdebug.queueMutexHolder**     asyncdebug.writerMutexHolder**     asyncdebug.lockMutexHolder**** These variables are used by some assert() statements that verify** the statements made in the "Deadlock Prevention" notes earlier** in this file.*/#ifndef NDEBUGstatic struct TestAsyncDebugData {  pthread_t lockMutexHolder;  pthread_t queueMutexHolder;  pthread_t writerMutexHolder;} asyncdebug = {0, 0, 0};/*** Wrapper around pthread_mutex_lock(). Checks that we have not violated** the anti-deadlock rules (see "Deadlock prevention" above).*/static int async_mutex_lock(pthread_mutex_t *pMutex){  int iIdx;  int rc;  pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async);  pthread_t *aHolder = (pthread_t *)(&asyncdebug);  /* The code in this 'ifndef NDEBUG' block depends on a certain alignment   * of the variables in TestAsyncStaticData and TestAsyncDebugData. The   * following assert() statements check that this has not been changed.   *   * Really, these only need to be run once at startup time.   */  assert(&(aMutex[0])==&async.lockMutex);  assert(&(aMutex[1])==&async.queueMutex);  assert(&(aMutex[2])==&async.writerMutex);  assert(&(aHolder[0])==&asyncdebug.lockMutexHolder);  assert(&(aHolder[1])==&asyncdebug.queueMutexHolder);  assert(&(aHolder[2])==&asyncdebug.writerMutexHolder);  assert( pthread_self()!=0 );  for(iIdx=0; iIdx<3; iIdx++){    if( pMutex==&aMutex[iIdx] ) break;    /* This is the key assert(). Here we are checking that if the caller     * is trying to block on async.writerMutex, neither of the other two     * mutex are held. If the caller is trying to block on async.queueMutex,     * lockMutex is not held.     */    assert(!pthread_equal(aHolder[iIdx], pthread_self()));  }  assert(iIdx<3);  rc = pthread_mutex_lock(pMutex);  if( rc==0 ){    assert(aHolder[iIdx]==0);    aHolder[iIdx] = pthread_self();  }  return rc;}/*** Wrapper around pthread_mutex_unlock().*/static int async_mutex_unlock(pthread_mutex_t *pMutex){  int iIdx;  int rc;  pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async);  pthread_t *aHolder = (pthread_t *)(&asyncdebug);  for(iIdx=0; iIdx<3; iIdx++){    if( pMutex==&aMutex[iIdx] ) break;  }  assert(iIdx<3);  assert(pthread_equal(aHolder[iIdx], pthread_self()));  aHolder[iIdx] = 0;  rc = pthread_mutex_unlock(pMutex);  assert(rc==0);  return 0;}/*** Wrapper around pthread_mutex_trylock().*/static int async_mutex_trylock(pthread_mutex_t *pMutex){  int iIdx;  int rc;  pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async);  pthread_t *aHolder = (pthread_t *)(&asyncdebug);  for(iIdx=0; iIdx<3; iIdx++){    if( pMutex==&aMutex[iIdx] ) break;  }  assert(iIdx<3);  rc = pthread_mutex_trylock(pMutex);  if( rc==0 ){    assert(aHolder[iIdx]==0);    aHolder[iIdx] = pthread_self();  }  return rc;}/*** Wrapper around pthread_cond_wait().*/static int async_cond_wait(pthread_cond_t *pCond, pthread_mutex_t *pMutex){  int iIdx;  int rc;  pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async);  pthread_t *aHolder = (pthread_t *)(&asyncdebug);  for(iIdx=0; iIdx<3; iIdx++){    if( pMutex==&aMutex[iIdx] ) break;  }  assert(iIdx<3);  assert(pthread_equal(aHolder[iIdx],pthread_self()));  aHolder[iIdx] = 0;  rc = pthread_cond_wait(pCond, pMutex);  if( rc==0 ){    aHolder[iIdx] = pthread_self();  }  return rc;}/* Call our async_XX wrappers instead of selected pthread_XX functions */#define pthread_mutex_lock    async_mutex_lock#define pthread_mutex_unlock  async_mutex_unlock#define pthread_mutex_trylock async_mutex_trylock#define pthread_cond_wait     async_cond_wait#endif   /* !defined(NDEBUG) *//*** Add an entry to the end of the global write-op list. pWrite should point ** to an AsyncWrite structure allocated using sqlite3_malloc().  The writer** thread will call sqlite3_free() to free the structure after the specified** operation has been completed.**** Once an AsyncWrite structure has been added to the list, it becomes the** property of the writer thread and must not be read or modified by the** caller.  */static void addAsyncWrite(AsyncWrite *pWrite){  /* We must hold the queue mutex in order to modify the queue pointers */  pthread_mutex_lock(&async.queueMutex);  /* Add the record to the end of the write-op queue */  assert( !pWrite->pNext );  if( async.pQueueLast ){    assert( async.pQueueFirst );    async.pQueueLast->pNext = pWrite;  }else{    async.pQueueFirst = pWrite;  }  async.pQueueLast = pWrite;  ASYNC_TRACE(("PUSH %p (%s %s %d)\n", pWrite, azOpcodeName[pWrite->op],         pWrite->pFileData ? pWrite->pFileData->zName : "-", pWrite->iOffset));  if( pWrite->op==ASYNC_CLOSE ){    async.nFile--;  }  /* Drop the queue mutex */  pthread_mutex_unlock(&async.queueMutex);  /* The writer thread might have been idle because there was nothing  ** on the write-op queue for it to do.  So wake it up. */  pthread_cond_signal(&async.queueSignal);}/*** Increment async.nFile in a thread-safe manner.*/static void incrOpenFileCount(){  /* We must hold the queue mutex in order to modify async.nFile */  pthread_mutex_lock(&async.queueMutex);  if( async.nFile==0 ){    async.ioError = SQLITE_OK;  }  async.nFile++;  pthread_mutex_unlock(&async.queueMutex);}/*** This is a utility function to allocate and populate a new AsyncWrite** structure and insert it (via addAsyncWrite() ) into the global list.*/static int addNewAsyncWrite(  AsyncFileData *pFileData,   int op,   i64 iOffset,   int nByte,  const char *zByte){  AsyncWrite *p;  if( op!=ASYNC_CLOSE && async.ioError ){    return async.ioError;  }  p = sqlite3_malloc(sizeof(AsyncWrite) + (zByte?nByte:0));  if( !p ){    /* The upper layer does not expect operations like OsWrite() to    ** return SQLITE_NOMEM. This is partly because under normal conditions    ** SQLite is required to do rollback without calling malloc(). So    ** if malloc() fails here, treat it as an I/O error. The above    ** layer knows how to handle that.    */    return SQLITE_IOERR;  }  p->op = op;  p->iOffset = iOffset;  p->nByte = nByte;  p->pFileData = pFileData;  p->pNext = 0;  if( zByte ){    p->zBuf = (char *)&p[1];    memcpy(p->zBuf, zByte, nByte);  }else{    p->zBuf = 0;  }  addAsyncWrite(p);  return SQLITE_OK;}/*** Close the file. This just adds an entry to the write-op list, the file is** not actually closed.*/static int asyncClose(sqlite3_file *pFile){  AsyncFileData *p = ((AsyncFile *)pFile)->pData;  /* Unlock the file, if it is locked */  pthread_mutex_lock(&async.lockMutex);  p->lock.eLock = 0;  pthread_mutex_unlock(&async.lockMutex);  addAsyncWrite(&p->close);  return SQLITE_OK;}/*** Implementation of sqlite3OsWrite() for asynchronous files. Instead of ** writing to the underlying file, this function adds an entry to the end of** the global AsyncWrite list. Either SQLITE_OK or SQLITE_NOMEM may be** returned.*/static int asyncWrite(sqlite3_file *pFile, const void *pBuf, int amt, i64 iOff){  AsyncFileData *p = ((AsyncFile *)pFile)->pData;  return addNewAsyncWrite(p, ASYNC_WRITE, iOff, amt, pBuf);}/*** Read data from the file. First we read from the filesystem, then adjust ** the contents of the buffer based on ASYNC_WRITE operations in the ** write-op queue.**** This method holds the mutex from start to finish.*/static int asyncRead(sqlite3_file *pFile, void *zOut, int iAmt, i64 iOffset){  AsyncFileData *p = ((AsyncFile *)pFile)->pData;  int rc = SQLITE_OK;  i64 filesize;  int nRead;  sqlite3_file *pBase = p->pBaseRead;  /* Grab the write queue mutex for the duration of the call */  pthread_mutex_lock(&async.queueMutex);  /* If an I/O error has previously occurred in this virtual file   ** system, then all subsequent operations fail.  */  if( async.ioError!=SQLITE_OK ){    rc = async.ioError;    goto asyncread_out;  }  if( pBase->pMethods ){    rc = sqlite3OsFileSize(pBase, &filesize);    if( rc!=SQLITE_OK ){      goto asyncread_out;    }    nRead = MIN(filesize - iOffset, iAmt);    if( nRead>0 ){      rc = sqlite3OsRead(pBase, zOut, nRead, iOffset);      ASYNC_TRACE(("READ %s %d bytes at %d\n", p->zName, nRead, iOffset));    }  }  if( rc==SQLITE_OK ){    AsyncWrite *pWrite;    char *zName = p->zName;    for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){      if( pWrite->op==ASYNC_WRITE && pWrite->pFileData->zName==zName ){        int iBeginOut = (pWrite->iOffset-iOffset);        int iBeginIn = -iBeginOut;        int nCopy;        if( iBeginIn<0 ) iBeginIn = 0;        if( iBeginOut<0 ) iBeginOut = 0;        nCopy = MIN(pWrite->nByte-iBeginIn, iAmt-iBeginOut);        if( nCopy>0 ){          memcpy(&((char *)zOut)[iBeginOut], &pWrite->zBuf[iBeginIn], nCopy);          ASYNC_TRACE(("OVERREAD %d bytes at %d\n", nCopy, iBeginOut+iOffset));        }      }    }  }asyncread_out:  pthread_mutex_unlock(&async.queueMutex);  return rc;}/*** Truncate the file to nByte bytes in length. This just adds an entry to ** the write-op list, no IO actually takes place.*/static int asyncTruncate(sqlite3_file *pFile, i64 nByte){  AsyncFileData *p = ((AsyncFile *)pFile)->pData;  return addNewAsyncWrite(p, ASYNC_TRUNCATE, nByte, 0, 0);}/*** Sync the file. This just adds an entry to the write-op list, the ** sync() is done later by sqlite3_async_flush().*/static int asyncSync(sqlite3_file *pFile, int flags){  AsyncFileData *p = ((AsyncFile *)pFile)->pData;  return addNewAsyncWrite(p, ASYNC_SYNC, 0, flags, 0);}/*** Read the size of the file. First we read the size of the file system ** entry, then adjust for any ASYNC_WRITE or ASYNC_TRUNCATE operations ** currently in the write-op list. **** This method holds the mutex from start to finish.*/int asyncFileSize(sqlite3_file *pFile, i64 *piSize){  AsyncFileData *p = ((AsyncFile *)pFile)->pData;  int rc = SQLITE_OK;  i64 s = 0;  sqlite3_file *pBase;  pthread_mutex_lock(&async.queueMutex);  /* Read the filesystem size from the base file. If pBaseRead is NULL, this  ** means the file hasn't been opened yet. In this case all relevant data   ** must be in the write-op queue anyway, so we can omit reading from the  ** file-system.  */  pBase = p->pBaseRead;  if( pBase->pMethods ){    rc = sqlite3OsFileSize(pBase, &s);  }  if( rc==SQLITE_OK ){    AsyncWrite *pWrite;    for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){      if( pWrite->op==ASYNC_DELETE && strcmp(p->zName, pWrite->zBuf)==0 ){        s = 0;      }else if( pWrite->pFileData && pWrite->pFileData->zName==p->zName){        switch( pWrite->op ){          case ASYNC_WRITE:            s = MAX(pWrite->iOffset + (i64)(pWrite->nByte), s);            break;          case ASYNC_TRUNCATE:            s = MIN(s, pWrite->iOffset);            break;        }      }    }    *piSize = s;  }  pthread_mutex_unlock(&async.queueMutex);  return rc;}/*** Lock or unlock the actual file-system entry.*/static int getFileLock(AsyncLock *pLock){  int rc = SQLITE_OK;  AsyncFileLock *pIter;  int eRequired = 0;  if( pLock->pFile ){    for(pIter=pLock->pList; pIter; pIter=pIter->pNext){      assert(pIter->eAsyncLock>=pIter->eLock);      if( pIter->eAsyncLock>eRequired ){        eRequired = pIter->eAsyncLock;        assert(eRequired>=0 && eRequired<=SQLITE_LOCK_EXCLUSIVE);      }    }    if( eRequired>pLock->eLock ){      rc = sqlite3OsLock(pLock->pFile, eRequired);      if( rc==SQLITE_OK ){        pLock->eLock = eRequired;      }    }    else if( eRequired<pLock->eLock && eRequired<=SQLITE_LOCK_SHARED ){      rc = sqlite3OsUnlock(pLock->pFile, eRequired);      if( rc==SQLITE_OK ){        pLock->eLock = eRequired;      }    }  }  return rc;}/*** The following two methods - asyncLock() and asyncUnlock() - are used** to obtain and release locks on database files opened with the** asynchronous backend.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -