⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test_async.c

📁 最新的sqlite3.6.2源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
      async_vfs.mxPathname = ((sqlite3_vfs *)async_vfs.pAppData)->mxPathname;      sqlite3_vfs_register(&async_vfs, 1);      if( !hashTableInit ){        sqlite3HashInit(&async.aLock, SQLITE_HASH_BINARY, 1);        hashTableInit = 1;      }    }  }else{    if( async_vfs.pAppData ){      sqlite3_vfs_unregister(&async_vfs);      async_vfs.pAppData = 0;    }  }}/* ** This procedure runs in a separate thread, reading messages off of the** write queue and processing them one by one.  **** If async.writerHaltNow is true, then this procedure exits** after processing a single message.**** If async.writerHaltWhenIdle is true, then this procedure exits when** the write queue is empty.**** If both of the above variables are false, this procedure runs** indefinately, waiting for operations to be added to the write queue** and processing them in the order in which they arrive.**** An artifical delay of async.ioDelay milliseconds is inserted before** each write operation in order to simulate the effect of a slow disk.**** Only one instance of this procedure may be running at a time.*/static void *asyncWriterThread(void *pIsStarted){  sqlite3_vfs *pVfs = (sqlite3_vfs *)(async_vfs.pAppData);  AsyncWrite *p = 0;  int rc = SQLITE_OK;  int holdingMutex = 0;  if( pthread_mutex_trylock(&async.writerMutex) ){    return 0;  }  (*(int *)pIsStarted) = 1;  while( async.writerHaltNow==0 ){    int doNotFree = 0;    sqlite3_file *pBase = 0;    if( !holdingMutex ){      pthread_mutex_lock(&async.queueMutex);    }    while( (p = async.pQueueFirst)==0 ){      pthread_cond_broadcast(&async.emptySignal);      if( async.writerHaltWhenIdle ){        pthread_mutex_unlock(&async.queueMutex);        break;      }else{        ASYNC_TRACE(("IDLE\n"));        pthread_cond_wait(&async.queueSignal, &async.queueMutex);        ASYNC_TRACE(("WAKEUP\n"));      }    }    if( p==0 ) break;    holdingMutex = 1;    /* Right now this thread is holding the mutex on the write-op queue.    ** Variable 'p' points to the first entry in the write-op queue. In    ** the general case, we hold on to the mutex for the entire body of    ** the loop.     **    ** However in the cases enumerated below, we relinquish the mutex,    ** perform the IO, and then re-request the mutex before removing 'p' from    ** the head of the write-op queue. The idea is to increase concurrency with    ** sqlite threads.    **    **     * An ASYNC_CLOSE operation.    **     * An ASYNC_OPENEXCLUSIVE operation. For this one, we relinquish     **       the mutex, call the underlying xOpenExclusive() function, then    **       re-aquire the mutex before seting the AsyncFile.pBaseRead     **       variable.    **     * ASYNC_SYNC and ASYNC_WRITE operations, if     **       SQLITE_ASYNC_TWO_FILEHANDLES was set at compile time and two    **       file-handles are open for the particular file being "synced".    */    if( async.ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){      p->op = ASYNC_NOOP;    }    if( p->pFileData ){      pBase = p->pFileData->pBaseWrite;      if(         p->op==ASYNC_CLOSE ||         p->op==ASYNC_OPENEXCLUSIVE ||        (pBase->pMethods && (p->op==ASYNC_SYNC || p->op==ASYNC_WRITE) )       ){        pthread_mutex_unlock(&async.queueMutex);        holdingMutex = 0;      }      if( !pBase->pMethods ){        pBase = p->pFileData->pBaseRead;      }    }    switch( p->op ){      case ASYNC_NOOP:        break;      case ASYNC_WRITE:        assert( pBase );        ASYNC_TRACE(("WRITE %s %d bytes at %d\n",                p->pFileData->zName, p->nByte, p->iOffset));        rc = sqlite3OsWrite(pBase, (void *)(p->zBuf), p->nByte, p->iOffset);        break;      case ASYNC_SYNC:        assert( pBase );        ASYNC_TRACE(("SYNC %s\n", p->pFileData->zName));        rc = sqlite3OsSync(pBase, p->nByte);        break;      case ASYNC_TRUNCATE:        assert( pBase );        ASYNC_TRACE(("TRUNCATE %s to %d bytes\n",                 p->pFileData->zName, p->iOffset));        rc = sqlite3OsTruncate(pBase, p->iOffset);        break;      case ASYNC_CLOSE: {        AsyncFileData *pData = p->pFileData;        ASYNC_TRACE(("CLOSE %s\n", p->pFileData->zName));        sqlite3OsClose(pData->pBaseWrite);        sqlite3OsClose(pData->pBaseRead);        /* Unlink AsyncFileData.lock from the linked list of AsyncFileLock         ** structures for this file. Obtain the async.lockMutex mutex         ** before doing so.        */        pthread_mutex_lock(&async.lockMutex);        rc = unlinkAsyncFile(pData);        pthread_mutex_unlock(&async.lockMutex);        async.pQueueFirst = p->pNext;        sqlite3_free(pData);        doNotFree = 1;        break;      }      case ASYNC_UNLOCK: {        AsyncLock *pLock;        AsyncFileData *pData = p->pFileData;        int eLock = p->nByte;        pthread_mutex_lock(&async.lockMutex);        pData->lock.eAsyncLock = MIN(            pData->lock.eAsyncLock, MAX(pData->lock.eLock, eLock)        );        assert(pData->lock.eAsyncLock>=pData->lock.eLock);        pLock = sqlite3HashFind(&async.aLock, pData->zName, pData->nName);        rc = getFileLock(pLock);        pthread_mutex_unlock(&async.lockMutex);        break;      }      case ASYNC_DELETE:        ASYNC_TRACE(("DELETE %s\n", p->zBuf));        rc = sqlite3OsDelete(pVfs, p->zBuf, (int)p->iOffset);        break;      case ASYNC_OPENEXCLUSIVE: {        int flags = (int)p->iOffset;        AsyncFileData *pData = p->pFileData;        ASYNC_TRACE(("OPEN %s flags=%d\n", p->zBuf, (int)p->iOffset));        assert(pData->pBaseRead->pMethods==0 && pData->pBaseWrite->pMethods==0);        rc = sqlite3OsOpen(pVfs, pData->zName, pData->pBaseRead, flags, 0);        assert( holdingMutex==0 );        pthread_mutex_lock(&async.queueMutex);        holdingMutex = 1;        break;      }      default: assert(!"Illegal value for AsyncWrite.op");    }    /* If we didn't hang on to the mutex during the IO op, obtain it now    ** so that the AsyncWrite structure can be safely removed from the     ** global write-op queue.    */    if( !holdingMutex ){      pthread_mutex_lock(&async.queueMutex);      holdingMutex = 1;    }    /* ASYNC_TRACE(("UNLINK %p\n", p)); */    if( p==async.pQueueLast ){      async.pQueueLast = 0;    }    if( !doNotFree ){      async.pQueueFirst = p->pNext;      sqlite3_free(p);    }    assert( holdingMutex );    /* An IO error has occured. We cannot report the error back to the    ** connection that requested the I/O since the error happened     ** asynchronously.  The connection has already moved on.  There     ** really is nobody to report the error to.    **    ** The file for which the error occured may have been a database or    ** journal file. Regardless, none of the currently queued operations    ** associated with the same database should now be performed. Nor should    ** any subsequently requested IO on either a database or journal file     ** handle for the same database be accepted until the main database    ** file handle has been closed and reopened.    **    ** Furthermore, no further IO should be queued or performed on any file    ** handle associated with a database that may have been part of a     ** multi-file transaction that included the database associated with     ** the IO error (i.e. a database ATTACHed to the same handle at some     ** point in time).    */    if( rc!=SQLITE_OK ){      async.ioError = rc;    }    if( async.ioError && !async.pQueueFirst ){      pthread_mutex_lock(&async.lockMutex);      if( 0==sqliteHashFirst(&async.aLock) ){        async.ioError = SQLITE_OK;      }      pthread_mutex_unlock(&async.lockMutex);    }    /* Drop the queue mutex before continuing to the next write operation    ** in order to give other threads a chance to work with the write queue.    */    if( !async.pQueueFirst || !async.ioError ){      pthread_mutex_unlock(&async.queueMutex);      holdingMutex = 0;      if( async.ioDelay>0 ){        sqlite3OsSleep(pVfs, async.ioDelay);      }else{        sched_yield();      }    }  }    pthread_mutex_unlock(&async.writerMutex);  return 0;}/**************************************************************************** The remaining code defines a Tcl interface for testing the asynchronous** IO implementation in this file.**** To adapt the code to a non-TCL environment, delete or comment out** the code that follows.*//*** sqlite3async_enable ?YES/NO?**** Enable or disable the asynchronous I/O backend.  This command is** not thread-safe.  Do not call it while any database connections** are open.*/static int testAsyncEnable(  void * clientData,  Tcl_Interp *interp,  int objc,  Tcl_Obj *CONST objv[]){  if( objc!=1 && objc!=2 ){    Tcl_WrongNumArgs(interp, 1, objv, "?YES/NO?");    return TCL_ERROR;  }  if( objc==1 ){    Tcl_SetObjResult(interp, Tcl_NewBooleanObj(async_vfs.pAppData!=0));  }else{    int en;    if( Tcl_GetBooleanFromObj(interp, objv[1], &en) ) return TCL_ERROR;    asyncEnable(en);  }  return TCL_OK;}/*** sqlite3async_halt  "now"|"idle"|"never"**** Set the conditions at which the writer thread will halt.*/static int testAsyncHalt(  void * clientData,  Tcl_Interp *interp,  int objc,  Tcl_Obj *CONST objv[]){  const char *zCond;  if( objc!=2 ){    Tcl_WrongNumArgs(interp, 1, objv, "\"now\"|\"idle\"|\"never\"");    return TCL_ERROR;  }  zCond = Tcl_GetString(objv[1]);  if( strcmp(zCond, "now")==0 ){    async.writerHaltNow = 1;    pthread_cond_broadcast(&async.queueSignal);  }else if( strcmp(zCond, "idle")==0 ){    async.writerHaltWhenIdle = 1;    async.writerHaltNow = 0;    pthread_cond_broadcast(&async.queueSignal);  }else if( strcmp(zCond, "never")==0 ){    async.writerHaltWhenIdle = 0;    async.writerHaltNow = 0;  }else{    Tcl_AppendResult(interp,       "should be one of: \"now\", \"idle\", or \"never\"", (char*)0);    return TCL_ERROR;  }  return TCL_OK;}/*** sqlite3async_delay ?MS?**** Query or set the number of milliseconds of delay in the writer** thread after each write operation.  The default is 0.  By increasing** the memory delay we can simulate the effect of slow disk I/O.*/static int testAsyncDelay(  void * clientData,  Tcl_Interp *interp,  int objc,  Tcl_Obj *CONST objv[]){  if( objc!=1 && objc!=2 ){    Tcl_WrongNumArgs(interp, 1, objv, "?MS?");    return TCL_ERROR;  }  if( objc==1 ){    Tcl_SetObjResult(interp, Tcl_NewIntObj(async.ioDelay));  }else{    int ioDelay;    if( Tcl_GetIntFromObj(interp, objv[1], &ioDelay) ) return TCL_ERROR;    async.ioDelay = ioDelay;  }  return TCL_OK;}/*** sqlite3async_start**** Start a new writer thread.*/static int testAsyncStart(  void * clientData,  Tcl_Interp *interp,  int objc,  Tcl_Obj *CONST objv[]){  pthread_t x;  int rc;  volatile int isStarted = 0;  rc = pthread_create(&x, 0, asyncWriterThread, (void *)&isStarted);  if( rc ){    Tcl_AppendResult(interp, "failed to create the thread", 0);    return TCL_ERROR;  }  pthread_detach(x);  while( isStarted==0 ){    sched_yield();  }  return TCL_OK;}/*** sqlite3async_wait**** Wait for the current writer thread to terminate.**** If the current writer thread is set to run forever then this** command would block forever.  To prevent that, an error is returned. */static int testAsyncWait(  void * clientData,  Tcl_Interp *interp,  int objc,  Tcl_Obj *CONST objv[]){  int cnt = 10;  if( async.writerHaltNow==0 && async.writerHaltWhenIdle==0 ){    Tcl_AppendResult(interp, "would block forever", (char*)0);    return TCL_ERROR;  }  while( cnt-- && !pthread_mutex_trylock(&async.writerMutex) ){    pthread_mutex_unlock(&async.writerMutex);    sched_yield();  }  if( cnt>=0 ){    ASYNC_TRACE(("WAIT\n"));    pthread_mutex_lock(&async.queueMutex);    pthread_cond_broadcast(&async.queueSignal);    pthread_mutex_unlock(&async.queueMutex);    pthread_mutex_lock(&async.writerMutex);    pthread_mutex_unlock(&async.writerMutex);  }else{    ASYNC_TRACE(("NO-WAIT\n"));  }  return TCL_OK;}#endif  /* SQLITE_OS_UNIX and SQLITE_THREADSAFE *//*** This routine registers the custom TCL commands defined in this** module.  This should be the only procedure visible from outside** of this module.*/int Sqlitetestasync_Init(Tcl_Interp *interp){#if SQLITE_OS_UNIX && SQLITE_THREADSAFE  Tcl_CreateObjCommand(interp,"sqlite3async_enable",testAsyncEnable,0,0);  Tcl_CreateObjCommand(interp,"sqlite3async_halt",testAsyncHalt,0,0);  Tcl_CreateObjCommand(interp,"sqlite3async_delay",testAsyncDelay,0,0);  Tcl_CreateObjCommand(interp,"sqlite3async_start",testAsyncStart,0,0);  Tcl_CreateObjCommand(interp,"sqlite3async_wait",testAsyncWait,0,0);  Tcl_LinkVar(interp, "sqlite3async_trace",      (char*)&sqlite3async_trace, TCL_LINK_INT);#endif  /* SQLITE_OS_UNIX and SQLITE_THREADSAFE */  return TCL_OK;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -